From b1157f60f593f020334eb70e675291162b35547b Mon Sep 17 00:00:00 2001 From: pk5ls20 Date: Thu, 17 Oct 2024 03:03:36 +0800 Subject: [PATCH] feat & refactor: packet highway (in almost right impl) --- src/core/packet/highway/client.ts | 254 ++-------------------------- src/core/packet/highway/frame.ts | 23 +++ src/core/packet/highway/session.ts | 97 +++++------ src/core/packet/highway/uploader.ts | 204 ++++++++++++++++++++++ src/core/packet/highway/utils.ts | 20 +++ 5 files changed, 308 insertions(+), 290 deletions(-) create mode 100644 src/core/packet/highway/frame.ts create mode 100644 src/core/packet/highway/uploader.ts create mode 100644 src/core/packet/highway/utils.ts diff --git a/src/core/packet/highway/client.ts b/src/core/packet/highway/client.ts index 257b9e96..b605fb02 100644 --- a/src/core/packet/highway/client.ts +++ b/src/core/packet/highway/client.ts @@ -1,17 +1,13 @@ -import * as net from "node:net"; import * as stream from 'node:stream'; -import * as crypto from 'node:crypto'; -import * as tea from '@/core/packet/utils/crypto/tea'; -import {BlockSize, PacketHighwaySig} from "@/core/packet/highway/session"; -import {NapProtoMsg} from "@/core/packet/proto/NapProto"; -import {ReqDataHighwayHead, RespDataHighwayHead} from "@/core/packet/proto/highway/highway"; +import {PacketHighwaySig} from "@/core/packet/highway/session"; import {LogWrapper} from "@/common/log"; -import {createHash} from "crypto"; -import {toHexString} from "image-size/dist/types/utils"; +import {ReadStream} from "fs"; +import {HighwayHttpUploader, HighwayTcpUploader} from "@/core/packet/highway/uploader"; -interface PacketHighwayTrans { +export interface PacketHighwayTrans { uin: string; cmd: number; + command: string; data: stream.Readable; sum: Uint8Array; size: number; @@ -20,82 +16,8 @@ interface PacketHighwayTrans { ext: Uint8Array; encrypt: boolean; timeout?: number; -} - -class PacketHighwayTransform extends stream.Transform { - private seq: number = 0; - private readonly trans: PacketHighwayTrans; - private offset: number = 0; - - constructor(trans: PacketHighwayTrans) { - super(); - this.trans = trans; - } - - private nextSeq() { - console.log(`[Highway] nextSeq: ${this.seq}`); - this.seq += 2; - return this.seq; - } - - private encryptTrans(trans: PacketHighwayTrans, key: Uint8Array) { - if (!trans.encrypt) return; - trans.ext = tea.encrypt(Buffer.from(trans.ext), Buffer.from(key)); - } - - buildHead(trans: PacketHighwayTrans, offset: number, length: number, md5Hash: Uint8Array): Uint8Array { - return new NapProtoMsg(ReqDataHighwayHead).encode({ - msgBaseHead: { - version: 1, - uin: trans.uin, // TODO: - command: "PicUp.DataUp", - seq: this.nextSeq(), - retryTimes: 0, - appId: 537234773, - dataFlag: 16, - commandId: trans.cmd, - }, - msgSegHead: { - filesize: BigInt(trans.size), - dataOffset: BigInt(offset), - dataLength: length, - serviceTicket: trans.ticket, - md5: md5Hash, - fileMd5: trans.sum, - }, - bytesReqExtendInfo: trans.ext, - timestamp: BigInt(Date.now()), - msgLoginSigHead: { - uint32LoginSigType: 8, - appId: 1600001615, - } - }) - } - - _transform(data: Buffer, encoding: BufferEncoding, callback: stream.TransformCallback) { - let offset = 0; // Offset within the current chunk - console.log(`[Highway] CALLED!!! _transform data.length = ${data.length}`); - while (offset < data.length) { - console.log(`[Highway] _transform offset = ${offset}, data.length = ${data.length}`); - const chunkSize = data.length > BlockSize ? BlockSize : data.length; - console.log(`[Highway] _transform calced chunkSize = ${chunkSize}`); - const chunk = data.slice(offset, offset + chunkSize); - const chunkMd5 = createHash('md5').update(chunk).digest(); - const head = this.buildHead(this.trans, this.offset, chunk.length, chunkMd5); - console.log(`[Highway] _transform: ${this.offset} | ${data.length} | ${chunkMd5.toString('hex')}`); - this.offset += chunk.length; - offset += chunk.length; - const headerBuffer = Buffer.allocUnsafe(9); - headerBuffer.writeUInt8(40); - headerBuffer.writeUInt32BE(head.length, 1); - headerBuffer.writeUInt32BE(chunk.length, 5); - this.push(headerBuffer); - this.push(head); - this.push(chunk); - this.push(Buffer.from([41])); - } - callback(null); - } + ip: string; + port: number; } export class PacketHighwayClient { @@ -114,166 +36,26 @@ export class PacketHighwayClient { this.port = port; } - framePack(head: Buffer, body: Buffer): Buffer[] { - const buffers: Buffer[] = []; - const buffer0 = Buffer.alloc(9); - buffer0[0] = 0x28; - buffer0.writeUInt32BE(head.length, 1); - buffer0.writeUInt32BE(body.length, 5); - buffers.push(buffer0); - buffers.push(head); - buffers.push(body); - buffers.push(Buffer.from([0x29])); - return buffers; - } - - frameUnpack(frame: Buffer): [Buffer, Buffer] { - const headLen = frame.readUInt32BE(1); - const bodyLen = frame.readUInt32BE(5); - return [frame.slice(9, 9 + headLen), frame.slice(9 + headLen, 9 + headLen + bodyLen)]; - } - - async postHighwayContent(frame: Buffer[], serverURL: string, end: boolean): Promise { - try { - const combinedBuffer = Buffer.concat(frame); - const response: Response = await fetch(serverURL, { - method: 'POST', - headers: new Headers({ - 'Connection': end ? 'close' : 'keep-alive', - 'Accept-Encoding': 'identity', - 'User-Agent': 'Mozilla/5.0 (compatible; MSIE 10.0; Windows NT 6.2)', - }), - body: combinedBuffer, - }); - if (!response.ok) { - throw new Error(`HTTP error! status: ${response.status} - ${response.statusText}`); - } - const arrayBuffer = await response.arrayBuffer(); - return Buffer.from(arrayBuffer); - } catch (error) { - throw error; - } - } - - private async httpUploadBlock(trans: PacketHighwayTrans, offset: number, block: Buffer): Promise { - const highwayTransForm = new PacketHighwayTransform(trans); - const isEnd = offset + block.length === trans.size; - const md5 = crypto.createHash('md5').update(block).digest(); - const payload = highwayTransForm.buildHead(trans, offset, block.length, md5); - this.logger.log(`[Highway] httpUploadBlock: payload = ${toHexString(payload)}`); - const frame = this.framePack(Buffer.from(payload), block); - const addr = this.sig.serverAddr[0]; - this.logger.log(`[Highway] httpUploadBlock: ${offset} | ${block.length} | ${toHexString(md5)}`); - const resp = await this.postHighwayContent(frame, `http://${addr.ip}:${addr.port}/cgi-bin/httpconn?htcmd=0x6FF0087&uin=3767830885`, isEnd); - const [head, body] = this.frameUnpack(resp); - const headData = new NapProtoMsg(RespDataHighwayHead).decode(head); - this.logger.log(`[Highway] ${headData.errorCode} | ${headData.msgSegHead?.retCode} | ${headData.bytesRspExtendInfo} | ${head.toString('hex')} | ${body.toString('hex')}`); - if (headData.errorCode !== 0) { - throw new Error(`[Highway] upload failed (code: ${headData.errorCode})`); - } - } - - async httpUpload(cmd: number, data: stream.Readable, fileSize: number, md5: Uint8Array, extendInfo: Uint8Array): Promise { - const trans: PacketHighwayTrans = { + private buildTrans(cmd: number, data: ReadStream, fileSize: number, md5: Uint8Array, extendInfo: Uint8Array, timeout: number = 3600): PacketHighwayTrans { + return { uin: this.sig.uin, cmd: cmd, + command: 'PicUp.DataUp', data: data, sum: md5, size: fileSize, ticket: this.sig.sigSession!, ext: extendInfo, encrypt: false, - timeout: 360, // TODO: - }; - let offset = 0; - console.log(`[Highway] httpUpload trans=${JSON.stringify(trans)}`); - for await (const chunk of data) { - let buffer = chunk as Buffer; - try { - await this.httpUploadBlock(trans, offset, buffer); - } catch (err) { - console.error(`Error uploading block at offset ${offset}: ${err}`); - throw err; - } - offset += buffer.length; - } + timeout: timeout, + ip: this.ip, + port: this.port, + } as PacketHighwayTrans; } - async tcpUpload(cmd: number, data: stream.Readable, fileSize: number, md5: Uint8Array, extendInfo: Uint8Array): Promise { - const trans: PacketHighwayTrans = { - uin: this.sig.uin, - cmd: cmd, - data: data, - sum: md5, - size: fileSize, - ticket: this.sig.sigSession!, - ext: extendInfo, - encrypt: false, - timeout: 360, // TODO: - }; - const highwayTransForm = new PacketHighwayTransform(trans); - return new Promise((resolve, reject) => { - const socket = net.connect(this.port, this.ip, () => { - trans.data.pipe(highwayTransForm).pipe(socket, {end: false}); - }) - const handleRspHeader = (header: Buffer) => { - console.log(`[Highway] handleRspHeader: ${header.toString('hex')}`); - const rsp = new NapProtoMsg(RespDataHighwayHead).decode(header); - if (rsp.errorCode !== 0) { - this.logger.logWarn(`highway upload failed (code: ${rsp.errorCode})`); - trans.data.unpipe(highwayTransForm).destroy(); - highwayTransForm.unpipe(socket).destroy(); - socket.end(); - reject(new Error(`highway upload failed (code: ${rsp.errorCode})`)); - } else { - const percent = ((Number(rsp.msgSegHead?.dataOffset) + Number(rsp.msgSegHead?.dataLength)) / Number(rsp.msgSegHead?.filesize)).toFixed(2); - this.logger.log(`[Highway] ${rsp.errorCode} | ${percent} | ${Buffer.from(header).toString('hex')}`); - if (rsp.msgSegHead?.flag === 1) { - this.logger.log('[Highway] tcpUpload finished.'); - socket.end(); - resolve(); - } - // if (Number(rsp.msgSegHead?.dataOffset) + Number(rsp.msgSegHead?.dataLength) > Number(rsp.msgSegHead?.filesize)) { - // this.logger.log('[Highway] tcpUpload finished.'); - // socket.end(); - // resolve(); - // } - } - }; - let buf = Buffer.alloc(0); - socket.on('data', (chunk: Buffer) => { - try { - buf = buf.length ? Buffer.concat([buf, chunk]) : chunk; - while (buf.length >= 5) { - const len = buf.readInt32BE(1); - if (buf.length >= len + 10) { - handleRspHeader(buf.slice(9, len + 9)); - buf = buf.slice(len + 10); - } else { - break; - } - } - } catch (e) { - this.logger.logError(`[Highway] upload error: ${e}`); - } - }) - socket.on('close', () => { - this.logger.log('[Highway] socket closed.'); - resolve(); - }) - socket.on('error', (err) => { - this.logger.logError('[Highway] socket.on tcpUpload error:', err); - }) - trans.data.on('error', (err) => { - this.logger.logError('[Highway] readable tcpUpload error:', err); - socket.end(); - }) - if (trans.timeout) { - setTimeout(() => { - this.logger.logError('[Highway] tcpUpload timeout!'); - socket.end(); - }, trans.timeout * 1000); - } - }) + async upload(cmd: number, data: ReadStream, fileSize: number, md5: Uint8Array, extendInfo: Uint8Array): Promise { + const trans = this.buildTrans(cmd, data, fileSize, md5, extendInfo); + const httpUploader = new HighwayHttpUploader(trans, this.logger); + await httpUploader.upload(); } } diff --git a/src/core/packet/highway/frame.ts b/src/core/packet/highway/frame.ts new file mode 100644 index 00000000..8c681286 --- /dev/null +++ b/src/core/packet/highway/frame.ts @@ -0,0 +1,23 @@ +import assert from "node:assert"; + +export class Frame{ + static pack(head: Buffer, body: Buffer): Buffer { + const totalLength = 9 + head.length + body.length + 1; + const buffer = Buffer.allocUnsafe(totalLength); + buffer[0] = 0x28; + buffer.writeUInt32BE(head.length, 1); + buffer.writeUInt32BE(body.length, 5); + head.copy(buffer, 9); + body.copy(buffer, 9 + head.length); + buffer[totalLength - 1] = 0x29; + return buffer; + } + + static unpack(frame: Buffer): [Buffer, Buffer] { + assert(frame[0] === 0x28 && frame[frame.length - 1] === 0x29, 'Invalid frame!'); + const headLen = frame.readUInt32BE(1); + const bodyLen = frame.readUInt32BE(5); + assert(frame.length === 9 + headLen + bodyLen + 1, 'Frame length does not match head and body lengths!'); + return [frame.subarray(9, 9 + headLen), frame.subarray(9 + headLen, 9 + headLen + bodyLen)]; + } +} diff --git a/src/core/packet/highway/session.ts b/src/core/packet/highway/session.ts index 9e311707..842dbb93 100644 --- a/src/core/packet/highway/session.ts +++ b/src/core/packet/highway/session.ts @@ -2,16 +2,17 @@ import * as fs from "node:fs"; import {LogWrapper} from "@/common/log"; import {PacketClient} from "@/core/packet/client"; import {PacketPacker} from "@/core/packet/packer"; -import {NapProtoEncodeStructType, NapProtoMsg} from "@/core/packet/proto/NapProto"; +import {NapProtoMsg} from "@/core/packet/proto/NapProto"; import {HttpConn0x6ff_501Response} from "@/core/packet/proto/action/action"; import {PacketHighwayClient} from "@/core/packet/highway/client"; import {ChatType, Peer} from "@/core"; -import {IPv4, NTV2RichMediaResp} from "@/core/packet/proto/oidb/common/Ntv2.RichMediaResp"; +import {NTV2RichMediaResp} from "@/core/packet/proto/oidb/common/Ntv2.RichMediaResp"; import {OidbSvcTrpcTcpBaseRsp} from "@/core/packet/proto/oidb/OidbBase"; import {PacketMsgPicElement} from "@/core/packet/msg/element"; -import {NTHighwayIPv4, NTV2RichMediaHighwayExt} from "@/core/packet/proto/highway/highway"; +import {NTV2RichMediaHighwayExt} from "@/core/packet/proto/highway/highway"; +import {int32ip2str, oidbIpv4s2HighwayIpv4s} from "@/core/packet/highway/utils"; -export const BlockSize = 1024 * 1024; +export const BlockSize = 512 * 1024; interface HighwayServerAddr { ip: string @@ -25,12 +26,14 @@ export interface PacketHighwaySig { serverAddr: HighwayServerAddr[] } + + export class PacketHighwaySession { protected packetClient: PacketClient; + protected packetHighwayClient: PacketHighwayClient; + protected sig: PacketHighwaySig; protected logger: LogWrapper; protected packer: PacketPacker; - protected sig: PacketHighwaySig; - protected packetHighwayClient: PacketHighwayClient; constructor(logger: LogWrapper, client: PacketClient) { this.packetClient = client; @@ -45,56 +48,50 @@ export class PacketHighwaySession { this.packetHighwayClient = new PacketHighwayClient(this.sig, this.logger); } - get available(): boolean { - return this.packetClient.available && this.sig.sigSession !== null && - this.sig.sessionKey !== null && this.sig.serverAddr.length > 0; - } - - private int32ip2str(ip: number) { - ip = ip & 0xffffffff; - return [ip & 0xff, (ip & 0xff00) >> 8, (ip & 0xff0000) >> 16, ((ip & 0xff000000) >> 24) & 0xff].join('.'); - } - - private oidbIpv4s2HighwayIpv4s(ipv4s: NapProtoEncodeStructType[]): NapProtoEncodeStructType[] { - return ipv4s.map((ipv4) => { - return { - domain: { - isEnable: true, - ip: this.int32ip2str(ipv4.outIP!), - } - } as NapProtoEncodeStructType - }) + private async checkAvailable() { + if (!this.packetClient.available) { + this.logger.logError('[Highway] packetClient not available!'); + throw new Error('packetClient not available!'); + } + if (this.sig.sigSession === null || this.sig.sessionKey === null) { + this.logger.logError('[Highway] sigSession or sessionKey not available!'); + await this.prepareUpload(); + } } // TODO: add signal to auto prepare when ready // TODO: refactor - async prepareUpload(): Promise { - this.logger.log('[Highway] prepare tcpUpload!'); + private async prepareUpload(): Promise { + this.logger.logDebug('[Highway] prepareUpload!'); const packet = this.packer.packHttp0x6ff_501(); const req = await this.packetClient.sendPacket('HttpConn.0x6ff_501', packet, true); - const u8RspData = Buffer.from(req.hex_data, 'hex'); - const rsp = new NapProtoMsg(HttpConn0x6ff_501Response).decode(u8RspData); + const rsp = new NapProtoMsg(HttpConn0x6ff_501Response).decode( + Buffer.from(req.hex_data, 'hex') + ); this.sig.sigSession = rsp.httpConn.sigSession this.sig.sessionKey = rsp.httpConn.sessionKey - // this.logger.log(`[Highway] sigSession ${Buffer.from(this.sigSession).toString('hex')}, - // sessionKey ${Buffer.from(this.sessionKey).toString('hex')}`) + this.logger.logDebug(`[Highway] sigSession ${Buffer.from(this.sig.sigSession).toString('hex')}, sessionKey ${Buffer.from(this.sig.sessionKey).toString('hex')}`) for (const info of rsp.httpConn.serverInfos) { if (info.serviceType !== 1) continue; for (const addr of info.serverAddrs) { - this.logger.log(`[Highway PrepareUpload] server addr add: ${this.int32ip2str(addr.ip)}:${addr.port}`); + this.logger.logDebug(`[Highway PrepareUpload] server addr add: ${int32ip2str(addr.ip)}:${addr.port}`); this.sig.serverAddr.push({ - ip: this.int32ip2str(addr.ip), + ip: int32ip2str(addr.ip), port: addr.port }) } } } - private async uploadGroupImageReq(groupUin: number, img: PacketMsgPicElement): Promise { - if (!this.available) { - this.logger.logError('[Highway] not ready to Upload image!'); - return; + async uploadImage(peer: Peer, img: PacketMsgPicElement): Promise { + await this.checkAvailable(); + if (peer.chatType === ChatType.KCHATTYPEGROUP) { + await this.uploadGroupImageReq(Number(peer.peerUid), img); } + // TODO: handle c2c pic upload + } + + private async uploadGroupImageReq(groupUin: number, img: PacketMsgPicElement): Promise { const preReq = await this.packer.packUploadGroupImgReq(groupUin, img); const preRespRaw = await this.packetClient.sendPacket('OidbSvcTrpcTcp.0x11c4_100', preReq, true); const preResp = new NapProtoMsg(OidbSvcTrpcTcpBaseRsp).decode( @@ -103,8 +100,7 @@ export class PacketHighwaySession { const preRespData = new NapProtoMsg(NTV2RichMediaResp).decode(preResp.body); const ukey = preRespData.upload.uKey; if (ukey && ukey != "") { - this.logger.log(`[Highway] get upload ukey: ${ukey}, need upload!`); - this.logger.log(preRespData.upload.msgInfo) + this.logger.logDebug(`[Highway] get upload ukey: ${ukey}, need upload!`); const index = preRespData.upload.msgInfo.msgInfoBody[0].index; const sha1 = Buffer.from(index.info.fileSha1, 'hex'); const md5 = Buffer.from(index.info.fileHash, 'hex'); @@ -112,7 +108,7 @@ export class PacketHighwaySession { fileUuid: index.fileUuid, uKey: ukey, network: { - ipv4S: this.oidbIpv4s2HighwayIpv4s(preRespData.upload.ipv4S) + ipv4S: oidbIpv4s2HighwayIpv4s(preRespData.upload.ipv4S) }, msgInfoBody: preRespData.upload.msgInfo.msgInfoBody, blockSize: BlockSize, @@ -120,24 +116,17 @@ export class PacketHighwaySession { fileSha1: [sha1] } }) - console.log('extend', Buffer.from(extend).toString('hex')) - await this.packetHighwayClient.httpUpload(1004, fs.createReadStream(img.path, { highWaterMark: BlockSize }), img.size, md5, extend); + await this.packetHighwayClient.upload( + 1004, + fs.createReadStream(img.path, {highWaterMark: BlockSize}), + img.size, + md5, + extend + ); } else { this.logger.logError(`[Highway] get upload invalid ukey ${ukey}, don't need upload!`); } img.msgInfo = preRespData.upload.msgInfo; // img.groupPicExt = new NapProtoMsg(CustomFace).decode(preRespData.tcpUpload.compatQMsg) } - - async uploadImage(peer: Peer, img: PacketMsgPicElement): Promise { - await this.prepareUpload(); - if (!this.available) { - this.logger.logError('[Highway] not ready to tcpUpload image!'); - return; - } - if (peer.chatType === ChatType.KCHATTYPEGROUP) { - await this.uploadGroupImageReq(Number(peer.peerUid), img); - } - // const uploadReq - } } diff --git a/src/core/packet/highway/uploader.ts b/src/core/packet/highway/uploader.ts new file mode 100644 index 00000000..eeb7cbfc --- /dev/null +++ b/src/core/packet/highway/uploader.ts @@ -0,0 +1,204 @@ +import stream from "node:stream"; +import {LogWrapper} from "@/common/log"; +import * as tea from "@/core/packet/utils/crypto/tea"; +import {NapProtoMsg} from "@/core/packet/proto/NapProto"; +import {ReqDataHighwayHead, RespDataHighwayHead} from "@/core/packet/proto/highway/highway"; +import {BlockSize} from "@/core/packet/highway/session"; +import {createHash} from "crypto"; +import {PacketHighwayTrans} from "@/core/packet/highway/client"; +import net from "node:net"; +import {Frame} from "@/core/packet/highway/frame"; +import crypto from "node:crypto"; +import http from "node:http"; + +abstract class HighwayUploader { + readonly trans: PacketHighwayTrans; + readonly logger : LogWrapper; + + constructor(trans: PacketHighwayTrans, logger: LogWrapper) { + this.trans = trans; + this.logger = logger; + } + + encryptTransExt(key: Uint8Array) { + if (!this.trans.encrypt) return; + this.trans.ext = tea.encrypt(Buffer.from(this.trans.ext), Buffer.from(key)); + } + + buildHead(offset: number, bodyLength: number, bodyMd5: Uint8Array): Uint8Array { + return new NapProtoMsg(ReqDataHighwayHead).encode({ + msgBaseHead: { + version: 1, + uin: this.trans.uin, + command: "PicUp.DataUp", + seq: 0, + retryTimes: 0, + appId: 1600001604, + dataFlag: 16, + commandId: this.trans.cmd, + }, + msgSegHead: { + serviceId: 0, + filesize: BigInt(this.trans.size), + dataOffset: BigInt(offset), + dataLength: bodyLength, + serviceTicket: this.trans.ticket, + md5: bodyMd5, + fileMd5: this.trans.sum, + cacheAddr: 0, + cachePort: 0, + }, + bytesReqExtendInfo: this.trans.ext, + timestamp: BigInt(0), + msgLoginSigHead: { + uint32LoginSigType: 8, + appId: 1600001604, + } + }) + } + + abstract upload(): Promise; +} + +class HighwayTcpUploaderTransform extends stream.Transform { + uploader: HighwayTcpUploader + + constructor(uploader: HighwayTcpUploader) { + super(); + this.uploader = uploader; + } + + _transform(data: Buffer, _: BufferEncoding, callback: stream.TransformCallback) { + let offset = 0; + this.uploader.logger.log(`[Highway] CALLED!!! _transform data.length = ${data.length}`); + while (offset < data.length) { + this.uploader.logger.log(`[Highway] _transform offset = ${offset}, data.length = ${data.length}`); + const chunkSize = data.length > BlockSize ? BlockSize : data.length; + this.uploader.logger.log(`[Highway] _transform calced chunkSize = ${chunkSize}`); + const chunk = data.slice(offset, offset + chunkSize); + const chunkMd5 = createHash('md5').update(chunk).digest(); + const head = this.uploader.buildHead(offset, chunk.length, chunkMd5); + this.uploader.logger.log(`[Highway] _transform: ${offset} | ${data.length} | ${chunkMd5.toString('hex')}`); + offset += chunk.length; + this.push(Frame.pack(Buffer.from(head), chunk)); + } + callback(null); + } +} + +export class HighwayTcpUploader extends HighwayUploader { + async upload(): Promise { + const highwayTransForm = new HighwayTcpUploaderTransform(this); + const upload = new Promise((resolve, reject) => { + const socket = net.connect(this.trans.port, this.trans.ip, () => { + this.trans.data.pipe(highwayTransForm).pipe(socket, {end: false}); + }) + const handleRspHeader = (header: Buffer) => { + const rsp = new NapProtoMsg(RespDataHighwayHead).decode(header); + if (rsp.errorCode !== 0) { + this.logger.logWarn(`highway upload failed (code: ${rsp.errorCode})`); + } else { + const percent = ((Number(rsp.msgSegHead?.dataOffset) + Number(rsp.msgSegHead?.dataLength)) / Number(rsp.msgSegHead?.filesize)).toFixed(2); + this.logger.log(`[Highway] ${rsp.errorCode} | ${percent} | ${Buffer.from(header).toString('hex')}`); + if (Number(rsp.msgSegHead?.dataOffset) + Number(rsp.msgSegHead?.dataLength) >= Number(rsp.msgSegHead?.filesize)) { + this.logger.log('[Highway] tcpUpload finished.'); + socket.end(); + resolve(); + } + } + }; + socket.on('data', (chunk: Buffer) => { + try { + const [head, _] = Frame.unpack(chunk); + handleRspHeader(head); + } catch (e) { + this.logger.logError(`[Highway] upload parse response error: ${e}`); + } + }) + socket.on('close', () => { + this.logger.log('[Highway] socket closed.'); + resolve(); + }) + socket.on('error', (err) => { + this.logger.logError('[Highway] socket.on tcpUpload error:', err); + }) + this.trans.data.on('error', (err) => { + this.logger.logError('[Highway] readable tcpUpload error:', err); + socket.end(); + }) + }) + const timeout = new Promise((_, reject) => { + setTimeout(() => { + reject(new Error(`[Highway] Upload timeout after ${this.trans.timeout}s`)) + }, (this.trans.timeout ?? Infinity) * 1000 + ) + }) + await Promise.race([upload, timeout]); + } +} + +export class HighwayHttpUploader extends HighwayUploader { + async upload(): Promise { + let offset = 0; + this.logger.logDebug(`[Highway] httpUpload trans=${JSON.stringify(this.trans)}`); + for await (const chunk of this.trans.data) { + let block = chunk as Buffer; + this.logger.logDebug(`[Highway] httpUpload chunk!!! buffer.length = ${block.length}`); + try { + await this.uploadBlock(block, offset); + } catch (err) { + this.logger.logError(`Error uploading block at offset ${offset}: ${err}`); + throw err; + } + offset += block.length; + } + } + + private async uploadBlock(block: Buffer, offset: number): Promise { + const isEnd = offset + block.length === this.trans.size; + const chunkMD5 = crypto.createHash('md5').update(block).digest(); + const payload = this.buildHead(offset, block.length, chunkMD5); + this.logger.log(`[Highway] httpUploadBlock: payload = ${Buffer.from(payload).toString('hex')}`); + const frame = Frame.pack(Buffer.from(payload), block) + this.logger.log(`[Highway] httpUploadBlock: ${offset} | ${block.length} | ${Buffer.from(chunkMD5).toString('hex')}`); + const resp = await this.httpPostHighwayContent(frame, `http://${this.trans.ip}:${this.trans.port}/cgi-bin/httpconn?htcmd=0x6FF0087&uin=${this.trans.uin}`, isEnd); + const [head, body] = Frame.unpack(resp); + const headData = new NapProtoMsg(RespDataHighwayHead).decode(head); + this.logger.log(`[Highway] httpUploadBlock: ${headData.errorCode} | ${headData.msgSegHead?.retCode} | ${headData.bytesRspExtendInfo} | ${head.toString('hex')} | ${body.toString('hex')}`); + if (headData.errorCode !== 0) { + this.logger.logError(`[Highway] httpUploadBlock failed (code=${headData.errorCode})`); + } + } + + private async httpPostHighwayContent(frame: Buffer, serverURL: string, end: boolean): Promise { + return new Promise((resolve, reject) => { + try { + const options: http.RequestOptions = { + method: 'POST', + headers: { + 'Connection': 'keep-alive', + 'Accept-Encoding': 'identity', + 'User-Agent': 'Mozilla/5.0 (compatible; MSIE 10.0; Windows NT 6.2)', + 'Content-Length': frame.length.toString(), + }, + }; + const req = http.request(serverURL, options, (res) => { + let data = Buffer.alloc(0); + res.on('data', (chunk) => { + data = Buffer.concat([data, chunk]); + }); + res.on('end', () => { + console.log(`[Highway] postHighwayContent: ${data.toString('hex')}`); + resolve(data); + }); + }); + req.write(frame); + req.on('error', (error) => { + reject(error); + }); + } catch (error) { + reject(error); + } + }); + } +} diff --git a/src/core/packet/highway/utils.ts b/src/core/packet/highway/utils.ts new file mode 100644 index 00000000..65e0be44 --- /dev/null +++ b/src/core/packet/highway/utils.ts @@ -0,0 +1,20 @@ +import {NapProtoEncodeStructType} from "@/core/packet/proto/NapProto"; +import {IPv4} from "@/core/packet/proto/oidb/common/Ntv2.RichMediaResp"; +import {NTHighwayIPv4} from "@/core/packet/proto/highway/highway"; + +export const int32ip2str = (ip: number) => { + ip = ip & 0xffffffff; + return [ip & 0xff, (ip & 0xff00) >> 8, (ip & 0xff0000) >> 16, ((ip & 0xff000000) >> 24) & 0xff].join('.'); +} + +export const oidbIpv4s2HighwayIpv4s = (ipv4s: NapProtoEncodeStructType[]): NapProtoEncodeStructType[] =>{ + return ipv4s.map((ip) => { + return { + domain: { + isEnable: true, + ip: int32ip2str(ip.outIP!), + }, + port: ip.outPort! + } as NapProtoEncodeStructType + }) +}