diff --git a/src/onebot/server/http.ts b/src/onebot/server/http.ts new file mode 100644 index 00000000..53302df5 --- /dev/null +++ b/src/onebot/server/http.ts @@ -0,0 +1,57 @@ +import { Response } from 'express'; +import { OB11Response } from '../action/OB11Response'; +import { HttpServerBase } from '@/common/server/http'; +import { OB11HeartbeatEvent } from '@/onebot/event/meta/OB11HeartbeatEvent'; +import { postOB11Event } from '@/onebot/server/postOB11Event'; + +class OB11HTTPServer extends HttpServerBase { + name = 'OneBot V11 server'; + + handleFailed(res: Response, payload: any, e: Error) { + res.send(OB11Response.error(e?.stack?.toString() || e.message || 'Error Handle', 200)); + } + + protected listen(port: number, host: string) { + if (ob11Config.http.enable) { + super.listen(port, host); + } + } +} + +export const ob11HTTPServer = new OB11HTTPServer(); + +setTimeout(() => { + for (const [actionName, action] of actionMap) { + for (const method of ['post', 'get']) { + ob11HTTPServer.registerRouter(method, actionName, (res, payload) => { + return action.handle(payload); + }); + } + } +}, 0); + + +class HTTPHeart { + intervalId: NodeJS.Timeout | null = null; + start(NewHeartInterval: number | undefined = undefined) { + let { heartInterval } = ob11Config; + if (NewHeartInterval && !Number.isNaN(NewHeartInterval)) { + heartInterval = NewHeartInterval; + } + if (this.intervalId) { + clearInterval(this.intervalId); + } + this.intervalId = setInterval(() => { + // ws的心跳是ws自己维护的 + postOB11Event(new OB11HeartbeatEvent(!!selfInfo.online, true, heartInterval), false, false); + }, heartInterval); + } + + stop() { + if (this.intervalId) { + clearInterval(this.intervalId); + } + } +} + +export const httpHeart = new HTTPHeart(); diff --git a/src/onebot/server/postOB11Event.ts b/src/onebot/server/postOB11Event.ts new file mode 100644 index 00000000..80b07b6d --- /dev/null +++ b/src/onebot/server/postOB11Event.ts @@ -0,0 +1,195 @@ +import { OB11Message, OB11MessageAt, OB11MessageData, OB11MessageReply } from '../types'; +import { OB11BaseMetaEvent } from '../event/meta/OB11BaseMetaEvent'; +import { OB11BaseNoticeEvent } from '../event/notice/OB11BaseNoticeEvent'; +import { WebSocket as WebSocketClass } from 'ws'; +import { wsReply } from './ws/reply'; +import crypto from 'crypto'; +import { ChatType, Group, GroupRequestOperateTypes, Peer } from '@/core/entities'; +import { normalize, sendMsg } from '../action/msg/SendMsg'; +import { OB11FriendRequestEvent } from '../event/request/OB11FriendRequest'; +import { OB11GroupRequestEvent } from '../event/request/OB11GroupRequest'; +import { isNull } from '@/common/utils/helper'; +import { NTQQFriendApi, NTQQGroupApi, NTQQUserApi } from '@/core/apis'; +import createSendElements from '../action/msg/SendMsg/create-send-elements'; +import { NapCatCore } from '@/core'; + +export type QuickActionEvent = OB11Message | OB11BaseMetaEvent | OB11BaseNoticeEvent +export type PostEventType = OB11Message | OB11BaseMetaEvent | OB11BaseNoticeEvent + +interface QuickActionPrivateMessage { + reply?: string; + auto_escape?: boolean; +} + +interface QuickActionGroupMessage extends QuickActionPrivateMessage { + // 回复群消息 + at_sender?: boolean; + delete?: boolean; + kick?: boolean; + ban?: boolean; + ban_duration?: number; + // +} + +interface QuickActionFriendRequest { + approve?: boolean; + remark?: string; +} + +interface QuickActionGroupRequest { + approve?: boolean; + reason?: string; +} + +export type QuickAction = + QuickActionPrivateMessage + & QuickActionGroupMessage + & QuickActionFriendRequest + & QuickActionGroupRequest + +const eventWSList: WebSocketClass[] = []; + +export function registerWsEventSender(ws: WebSocketClass) { + eventWSList.push(ws); +} + +export function unregisterWsEventSender(ws: WebSocketClass) { + const index = eventWSList.indexOf(ws); + if (index !== -1) { + eventWSList.splice(index, 1); + } +} + +export function postWsEvent(event: QuickActionEvent) { + for (const ws of eventWSList) { + new Promise(() => { + wsReply(ws, event); + }).then(); + } +} + +export function postOB11Event(msg: QuickActionEvent, reportSelf = false, postWs = true) { + const config = ob11Config; + + // 判断msg是否是event + if (!config.reportSelfMessage && !reportSelf) { + if (msg.post_type === 'message' && (msg as OB11Message).user_id.toString() == selfInfo.uin) { + return; + } + } + if (config.http.enablePost) { + const msgStr = JSON.stringify(msg); + const hmac = crypto.createHmac('sha1', ob11Config.http.secret); + hmac.update(msgStr); + const sig = hmac.digest('hex'); + const headers: Record = { + 'Content-Type': 'application/json', + 'x-self-id': selfInfo.uin + }; + if (config.http.secret) { + headers['x-signature'] = 'sha1=' + sig; + } + for (const host of config.http.postUrls) { + fetch(host, { + method: 'POST', + headers, + body: msgStr + }).then(async (res) => { + //logDebug(`新消息事件HTTP上报成功: ${host} `, msgStr); + // todo: 处理不够优雅,应该使用高级泛型进行QuickAction类型识别 + let resJson: QuickAction; + try { + resJson = await res.json(); + //logDebug('新消息事件HTTP上报返回快速操作: ', JSON.stringify(resJson)); + } catch (e) { + logDebug('新消息事件HTTP上报没有返回快速操作,不需要处理'); + return; + } + try { + handleQuickOperation(msg as QuickActionEvent, resJson).then().catch(logError); + } catch (e: any) { + logError('新消息事件HTTP上报返回快速操作失败', e); + } + + }, (err: any) => { + logError(`新消息事件HTTP上报失败: ${host} `, err, msg); + }); + } + } + if (postWs) { + postWsEvent(msg); + } +} +async function handleMsg(msg: OB11Message, quickAction: QuickAction, coreContext: NapCatCore) { + const NTQQUserApi = coreContext.getApiContext().UserApi; + msg = msg as OB11Message; + const reply = quickAction.reply; + const peer: Peer = { + chatType: ChatType.friend, + peerUid: await NTQQUserApi.getUidByUin(msg.user_id.toString()) as string + }; + if (msg.message_type == 'private') { + if (msg.sub_type === 'group') { + peer.chatType = ChatType.temp; + } + } else { + peer.chatType = ChatType.group; + peer.peerUid = msg.group_id!.toString(); + } + if (reply) { + let group: Group | undefined; + let replyMessage: OB11MessageData[] = []; + + if (msg.message_type == 'group') { + group = await getGroup(msg.group_id!.toString()); + replyMessage.push({ + type: 'reply', + data: { + id: msg.message_id.toString() + } + } as OB11MessageReply); + if ((quickAction as QuickActionGroupMessage).at_sender) { + replyMessage.push({ + type: 'at', + data: { + qq: msg.user_id.toString() + } + } as OB11MessageAt); + } + } + replyMessage = replyMessage.concat(normalize(reply, quickAction.auto_escape)); + const { sendElements, deleteAfterSentFiles } = await createSendElements(replyMessage, peer); + sendMsg(peer, sendElements, deleteAfterSentFiles, false).then().catch(logError); + } +} +async function handleGroupRequest(request: OB11GroupRequestEvent, quickAction: QuickActionGroupRequest, coreContext: NapCatCore) { + const NTQQGroupApi = coreContext.getApiContext().GroupApi; + if (!isNull(quickAction.approve)) { + NTQQGroupApi.handleGroupRequest( + request.flag, + quickAction.approve ? GroupRequestOperateTypes.approve : GroupRequestOperateTypes.reject, + quickAction.reason, + ).then().catch(coreContext.context.logger.logError); + } +} +async function handleFriendRequest(request: OB11FriendRequestEvent, quickAction: QuickActionFriendRequest, coreContext: NapCatCore) { + const NTQQFriendApi = coreContext.getApiContext().FriendApi; + if (!isNull(quickAction.approve)) { + NTQQFriendApi.handleFriendRequest(request.flag, !!quickAction.approve).then().catch(coreContext.context.logger.logError); + } +} +export async function handleQuickOperation(context: QuickActionEvent, quickAction: QuickAction, coreContext: NapCatCore) { + if (context.post_type === 'message') { + handleMsg(context as OB11Message, quickAction, coreContext).then().catch(coreContext.context.logger.logError); + } + if (context.post_type === 'request') { + const friendRequest = context as OB11FriendRequestEvent; + const groupRequest = context as OB11GroupRequestEvent; + if ((friendRequest).request_type === 'friend') { + handleFriendRequest(friendRequest, quickAction, coreContext).then().catch(coreContext.context.logger.logError); + } + else if (groupRequest.request_type === 'group') { + handleGroupRequest(groupRequest, quickAction, coreContext).then().catch(coreContext.context.logger.logError); + } + } +} \ No newline at end of file diff --git a/src/onebot/server/ws/ReverseWebsocket.ts b/src/onebot/server/ws/ReverseWebsocket.ts new file mode 100644 index 00000000..08792ce7 --- /dev/null +++ b/src/onebot/server/ws/ReverseWebsocket.ts @@ -0,0 +1,147 @@ +import { LifeCycleSubType, OB11LifeCycleEvent } from '../../event/meta/OB11LifeCycleEvent'; +import { ActionName } from '../../action/types'; +import { OB11Response } from '../../action/OB11Response'; +import BaseAction from '../../action/BaseAction'; +import { actionMap } from '../../action'; +import { postWsEvent, registerWsEventSender, unregisterWsEventSender } from '../postOB11Event'; +import { wsReply } from './reply'; +import { WebSocket as WebSocketClass } from 'ws'; +import { OB11HeartbeatEvent } from '../../event/meta/OB11HeartbeatEvent'; +import { log, logDebug, logError } from '../../../common/utils/log'; +import { ob11Config } from '@/onebot11/config'; +import { napCatCore } from '@/core'; +import { selfInfo } from '@/core/data'; + +export let rwsList: ReverseWebsocket[] = []; + +export class ReverseWebsocket { + public websocket: WebSocketClass | undefined; + public url: string; + private running: boolean = false; + + public constructor(url: string) { + this.url = url; + this.running = true; + this.connect(); + } + + public stop() { + this.running = false; + this.websocket!.close(); + } + + public onopen() { + wsReply(this.websocket!, new OB11LifeCycleEvent(LifeCycleSubType.CONNECT)); + } + + public async onmessage(msg: string) { + let receiveData: { action: ActionName | undefined, params: any, echo?: any } = { action: undefined, params: {} }; + let echo = null; + try { + receiveData = JSON.parse(msg.toString()); + echo = receiveData.echo; + //logDebug('收到反向Websocket消息', receiveData); + } catch (e) { + return wsReply(this.websocket!, OB11Response.error('json解析失败,请检查数据格式', 1400, echo)); + } + const action: BaseAction | undefined = actionMap.get(receiveData.action!); + if (!action) { + return wsReply(this.websocket!, OB11Response.error('不支持的api ' + receiveData.action, 1404, echo)); + } + try { + receiveData.params = (receiveData?.params) ? receiveData.params : {};//兼容类型验证 + const handleResult = await action.websocketHandle(receiveData.params, echo); + wsReply(this.websocket!, handleResult); + } catch (e) { + wsReply(this.websocket!, OB11Response.error(`api处理出错:${e}`, 1200, echo)); + } + } + + public onclose = () => { + logError('反向ws断开', this.url); + unregisterWsEventSender(this.websocket!); + if (this.running) { + this.reconnect(); + } + }; + + public send(msg: string) { + if (this.websocket && this.websocket.readyState == WebSocket.OPEN) { + this.websocket.send(msg); + } + } + + private reconnect() { + setTimeout(() => { + this.connect(); + }, 3000); // TODO: 重连间隔在配置文件中实现 + } + + private connect() { + const { token, heartInterval } = ob11Config; + this.websocket = new WebSocketClass(this.url, { + maxPayload: 1024 * 1024 * 1024, + handshakeTimeout: 2000, + perMessageDeflate: false, + headers: { + 'X-Self-ID': selfInfo.uin, + 'Authorization': `Bearer ${token}`, + 'x-client-role': 'Universal', // koishi-adapter-onebot 需要这个字段 + 'User-Agent': 'OneBot/11', + } + }); + registerWsEventSender(this.websocket); + logDebug('Trying to connect to the websocket server: ' + this.url); + + + this.websocket.on('open', () => { + logDebug('Connected to the websocket server: ' + this.url); + this.onopen(); + }); + + this.websocket.on('message', async (data) => { + await this.onmessage(data.toString()); + }); + + this.websocket.on('error', log); + + const wsClientInterval = setInterval(() => { + wsReply(this.websocket!, new OB11HeartbeatEvent(!!selfInfo.online, true, heartInterval)); + }, heartInterval); // 心跳包 + this.websocket.on('close', () => { + clearInterval(wsClientInterval); + logDebug('The websocket connection: ' + this.url + ' closed, trying reconnecting...'); + this.onclose(); + }); + } +} + +class OB11ReverseWebsockets { + start() { + for (const url of ob11Config.reverseWs.urls) { + log('开始连接反向ws', url); + new Promise(() => { + try { + rwsList.push(new ReverseWebsocket(url)); + } catch (e: any) { + logError(e.stack); + } + }).then(); + } + } + + stop() { + for (const rws of rwsList) { + rws.stop(); + } + rwsList = [];//清空旧的反向ws + } + + restart() { + this.stop(); + this.start(); + } +} + +export const ob11ReverseWebsockets = new OB11ReverseWebsockets(); + diff --git a/src/onebot/server/ws/WebsocketServer.ts b/src/onebot/server/ws/WebsocketServer.ts new file mode 100644 index 00000000..838de2a5 --- /dev/null +++ b/src/onebot/server/ws/WebsocketServer.ts @@ -0,0 +1,86 @@ +import { WebSocket } from 'ws'; +import http from 'http'; +import { actionMap } from '../../action'; +import { OB11Response } from '../../action/OB11Response'; +import { postWsEvent, registerWsEventSender, unregisterWsEventSender } from '../postOB11Event'; +import { ActionName } from '../../action/types'; +import BaseAction from '../../action/BaseAction'; +import { LifeCycleSubType, OB11LifeCycleEvent } from '../../event/meta/OB11LifeCycleEvent'; +import { OB11HeartbeatEvent } from '../../event/meta/OB11HeartbeatEvent'; +import { WebsocketServerBase } from '@/common/server/websocket'; +import { IncomingMessage } from 'node:http'; +import { wsReply } from './reply'; +import { napCatCore } from '@/core'; +import { log, logDebug, logError } from '../../../common/utils/log'; +import { ob11Config } from '@/onebot11/config'; +import { selfInfo } from '@/core/data'; + +const heartbeatRunning = false; + +class OB11WebsocketServer extends WebsocketServerBase { + + public start(port: number | http.Server, host: string = '') { + this.token = ob11Config.token; + super.start(port, host); + } + + authorizeFailed(wsClient: WebSocket) { + wsClient.send(JSON.stringify(OB11Response.res(null, 'failed', 1403, 'token验证失败'))); + } + + async handleAction(wsClient: WebSocket, actionName: string, params: any, echo?: any) { + const action: BaseAction | undefined = actionMap.get(actionName); + if (!action) { + return wsReply(wsClient, OB11Response.error('不支持的api ' + actionName, 1404, echo)); + } + try { + const handleResult = await action.websocketHandle(params, echo); + wsReply(wsClient, handleResult); + } catch (e: any) { + wsReply(wsClient, OB11Response.error(`api处理出错:${e.stack}`, 1200, echo)); + } + } + + onConnect(wsClient: WebSocket, url: string, req: IncomingMessage) { + if (url == '/api' || url == '/api/' || url == '/') { + wsClient.on('message', async (msg) => { + // eslint-disable-next-line @typescript-eslint/ban-ts-comment + // @ts-expect-error + let receiveData: { action: ActionName, params?: any, echo?: any } = { action: '', params: {} }; + let echo = null; + try { + receiveData = JSON.parse(msg.toString()); + echo = receiveData.echo; + logDebug('收到正向Websocket消息', receiveData); + } catch (e) { + return wsReply(wsClient, OB11Response.error('json解析失败,请检查数据格式', 1400, echo)); + } + receiveData.params = (receiveData?.params) ? receiveData.params : {};//兼容类型验证 + this.handleAction(wsClient, receiveData.action, receiveData.params, receiveData.echo).then(); + }); + } + if (url == '/event' || url == '/event/' || url == '/') { + registerWsEventSender(wsClient); + + logDebug('event上报ws客户端已连接'); + + try { + wsReply(wsClient, new OB11LifeCycleEvent(LifeCycleSubType.CONNECT)); + } catch (e) { + logError('发送生命周期失败', e); + } + const { heartInterval } = ob11Config; + const wsClientInterval = setInterval(() => { + wsReply(wsClient, new OB11HeartbeatEvent(!!selfInfo.online, true, heartInterval)); + }, heartInterval); // 心跳包 + wsClient.on('close', () => { + logError('event上报ws客户端已断开'); + clearInterval(wsClientInterval); + unregisterWsEventSender(wsClient); + }); + } + } +} + +export const ob11WebsocketServer = new OB11WebsocketServer(); + diff --git a/src/onebot/server/ws/reply.ts b/src/onebot/server/ws/reply.ts new file mode 100644 index 00000000..7e53a1c6 --- /dev/null +++ b/src/onebot/server/ws/reply.ts @@ -0,0 +1,23 @@ +import { WebSocket as WebSocketClass } from 'ws'; +import { OB11Response } from '../../action/OB11Response'; +import { PostEventType } from '../postOB11Event'; +import { log, logDebug, logError } from '@/common/utils/log'; +import { isNull } from '@/common/utils/helper'; + + +export function wsReply(wsClient: WebSocketClass, data: OB11Response | PostEventType) { + try { + const packet = Object.assign({}, data); + // eslint-disable-next-line @typescript-eslint/ban-ts-comment + // @ts-expect-error + if (isNull(packet['echo'])) { + // eslint-disable-next-line @typescript-eslint/ban-ts-comment + // @ts-expect-error + delete packet['echo']; + } + wsClient.send(JSON.stringify(packet)); + logDebug('ws 消息上报', wsClient.url || '', data); + } catch (e: any) { + logError('websocket 回复失败', e.stack, data); + } +}