diff --git a/src/server/http.ts b/src/common/server/http.ts similarity index 98% rename from src/server/http.ts rename to src/common/server/http.ts index f744de5..7500564 100644 --- a/src/server/http.ts +++ b/src/common/server/http.ts @@ -1,5 +1,5 @@ import express, {Express, Request, Response} from "express"; -import {getConfigUtil, log} from "../common/utils"; +import {getConfigUtil, log} from "../utils"; import http from "http"; const JSONbig = require('json-bigint')({storeAsString: true}); diff --git a/src/common/server/websocket.ts b/src/common/server/websocket.ts new file mode 100644 index 0000000..8d6b7de --- /dev/null +++ b/src/common/server/websocket.ts @@ -0,0 +1,95 @@ +import {Server, WebSocket} from "ws"; +import {getConfigUtil, log} from "../utils"; +import urlParse from "url"; +import {IncomingMessage} from "node:http"; + +class WebsocketClientBase { + private wsClient: WebSocket + + constructor() { + } + + send(msg: string) { + if (this.wsClient && this.wsClient.readyState == WebSocket.OPEN) { + this.wsClient.send(msg); + } + } + + onMessage(msg: string){ + + } +} + +export class WebsocketServerBase { + private ws: Server = null; + + constructor() { + console.log(`llonebot websocket service started`) + } + + start(port: number) { + this.ws = new Server({port}); + this.ws.on("connection", (wsClient, req)=>{ + const url = req.url.split("?").shift() + this.authorize(wsClient, req); + this.onConnect(wsClient, url, req); + wsClient.on("message", async (msg)=>{ + this.onMessage(wsClient, url, msg.toString()) + }) + }) + } + + stop() { + this.ws.close((err) => { + log("ws server close failed!", err) + }); + this.ws = null; + } + restart(port: number){ + this.stop(); + this.start(port); + } + + authorize(wsClient: WebSocket, req) { + let token = getConfigUtil().getConfig().token; + const url = req.url.split("?").shift(); + log("ws connect", url) + let clientToken: string = "" + const authHeader = req.headers['authorization']; + if (authHeader) { + clientToken = authHeader.split("Bearer ").pop() + log("receive ws header token", clientToken); + } else { + const parsedUrl = urlParse.parse(req.url, true); + const urlToken = parsedUrl.query.access_token; + if (urlToken) { + if (Array.isArray(urlToken)) { + clientToken = urlToken[0] + } else { + clientToken = urlToken + } + log("receive ws url token", clientToken); + } + } + if (token && clientToken != token) { + this.authorizeFailed(wsClient) + return wsClient.close() + } + } + + authorizeFailed(wsClient: WebSocket) { + + } + + onConnect(wsClient: WebSocket, url: string, req: IncomingMessage) { + + } + + onMessage(wsClient: WebSocket, url: string, msg: string) { + + } + + sendHeart() { + + } +} \ No newline at end of file diff --git a/src/main/main.ts b/src/main/main.ts index c83eb10..1b955fd 100644 --- a/src/main/main.ts +++ b/src/main/main.ts @@ -4,7 +4,7 @@ import {BrowserWindow, ipcMain} from 'electron'; import fs from 'fs'; import {Config} from "../common/types"; import {CHANNEL_GET_CONFIG, CHANNEL_LOG, CHANNEL_SET_CONFIG,} from "../common/channels"; -import {initWebsocket, postMsg} from "../onebot11/server"; +import {ob11WebsocketServer} from "../onebot11/server/ws/WebsocketServer"; import {CONFIG_DIR, getConfigUtil, log} from "../common/utils"; import {addHistoryMsg, getGroupMember, msgHistory, selfInfo} from "../common/data"; import {hookNTQQApiCall, hookNTQQApiReceive, ReceiveCmd, registerReceiveHook} from "../ntqqapi/hook"; @@ -14,6 +14,8 @@ import {ChatType, RawMessage} from "../ntqqapi/types"; import {ob11HTTPServer} from "../onebot11/server/http"; import {OB11FriendRecallNoticeEvent} from "../onebot11/event/notice/OB11FriendRecallNoticeEvent"; import {OB11GroupRecallNoticeEvent} from "../onebot11/event/notice/OB11GroupRecallNoticeEvent"; +import {postEvent} from "../onebot11/server/postevent"; +import {ob11ReverseWebsockets} from "../onebot11/server/ws/ReverseWebsocket"; let running = false; @@ -34,13 +36,44 @@ function onLoad() { if (arg.ob11.httpPort != oldConfig.ob11.httpPort && arg.ob11.enableHttp) { ob11HTTPServer.restart(arg.ob11.httpPort); } + // 判断是否启用或关闭HTTP服务 if (!arg.ob11.enableHttp) { - ob11HTTPServer.stop() + ob11HTTPServer.stop(); } else { ob11HTTPServer.start(arg.ob11.httpPort); } + // 正向ws端口变化,重启服务 if (arg.ob11.wsPort != oldConfig.ob11.wsPort) { - initWebsocket(arg.ob11.wsPort) + ob11WebsocketServer.restart(arg.ob11.wsPort); + } + // 判断是否启用或关闭正向ws + if (arg.ob11.enableWs != oldConfig.ob11.enableWs) { + if (arg.ob11.enableWs) { + ob11WebsocketServer.start(arg.ob11.wsPort); + } else { + ob11WebsocketServer.stop(); + } + } + // 判断是否启用或关闭反向ws + if (arg.ob11.enableWsReverse != oldConfig.ob11.enableWsReverse) { + if (arg.ob11.enableWsReverse) { + ob11ReverseWebsockets.start(); + } else { + ob11ReverseWebsockets.stop(); + } + } + if (arg.ob11.enableWsReverse) { + // 判断反向ws地址有变化 + if (arg.ob11.wsHosts.length != oldConfig.ob11.wsHosts.length) { + ob11ReverseWebsockets.restart(); + } else { + for (const newHost of arg.ob11.wsHosts) { + if (!oldConfig.ob11.wsHosts.includes(newHost)) { + ob11ReverseWebsockets.restart(); + break; + } + } + } } }) @@ -64,7 +97,7 @@ function onLoad() { if (msg.user_id.toString() == selfInfo.uin && !reportSelfMessage) { return } - postMsg(msg); + postEvent(msg); // log("post msg", msg) }).catch(e => log("constructMessage error: ", e.toString())); } @@ -90,7 +123,7 @@ function onLoad() { } if (message.chatType == ChatType.friend) { const friendRecallEvent = new OB11FriendRecallNoticeEvent(parseInt(message.senderUin), oriMessage.msgShortId); - postMsg(friendRecallEvent); + postEvent(friendRecallEvent); } else if (message.chatType == ChatType.group) { let operatorId = message.senderUin for (const element of message.elements) { @@ -105,7 +138,7 @@ function onLoad() { oriMessage.msgShortId ) - postMsg(groupRecallEvent); + postEvent(groupRecallEvent); } continue } @@ -126,12 +159,20 @@ function onLoad() { }) NTQQApi.getGroups(true).then() const config = getConfigUtil().getConfig() - try { - ob11HTTPServer.start(config.ob11.httpPort) - initWebsocket(config.ob11.wsPort); - } catch (e) { - console.log("start failed", e) + if (config.ob11.enableHttp) { + try { + ob11HTTPServer.start(config.ob11.httpPort) + } catch (e) { + log("http server start failed", e); + } } + if (config.ob11.enableWs){ + ob11WebsocketServer.start(config.ob11.wsPort); + } + if (config.ob11.enableWsReverse){ + ob11ReverseWebsockets.start(); + } + log("LLOneBot start") } diff --git a/src/ntqqapi/hook.ts b/src/ntqqapi/hook.ts index bd0255c..ec29621 100644 --- a/src/ntqqapi/hook.ts +++ b/src/ntqqapi/hook.ts @@ -5,8 +5,8 @@ import {Group, RawMessage, User} from "./types"; import {addHistoryMsg, friends, groups, msgHistory} from "../common/data"; import {OB11GroupDecreaseEvent} from "../onebot11/event/notice/OB11GroupDecreaseEvent"; import {OB11GroupIncreaseEvent} from "../onebot11/event/notice/OB11GroupIncreaseEvent"; -import {postMsg} from "../onebot11/server"; import {v4 as uuidv4} from "uuid" +import {postEvent} from "../onebot11/server/postevent"; export let hookApiCallbacks: Record void> = {} @@ -155,7 +155,7 @@ async function processGroupEvent(payload) { for (const member of oldMembers) { if (!newMembersSet.has(member.uin)) { - postMsg(new OB11GroupDecreaseEvent(group.groupCode, parseInt(member.uin))); + postEvent(new OB11GroupDecreaseEvent(group.groupCode, parseInt(member.uin))); break; } } @@ -174,7 +174,7 @@ async function processGroupEvent(payload) { group.members = newMembers; for (const member of newMembers) { if (!oldMembersSet.has(member.uin)) { - postMsg(new OB11GroupIncreaseEvent(group.groupCode, parseInt(member.uin))); + postEvent(new OB11GroupIncreaseEvent(group.groupCode, parseInt(member.uin))); break; } } diff --git a/src/onebot11/ReconnectingWebsocket.ts b/src/onebot11/ReconnectingWebsocket.ts deleted file mode 100644 index ecf5c5e..0000000 --- a/src/onebot11/ReconnectingWebsocket.ts +++ /dev/null @@ -1,56 +0,0 @@ -import {log} from "../common/utils"; - -const WebSocket = require("ws"); - -export class ReconnectingWebsocket { - private websocket; - private readonly url: string; - - public constructor(url: string) { - this.url = url; - this.reconnect() - } - - public onopen = function (){} - - public onmessage = function (msg){} - - public onclose = function () {} - - public send(msg) { - if (this.websocket && this.websocket.readyState == WebSocket.OPEN) { - this.websocket.send(msg); - } - } - - private reconnect() { - this.websocket = new WebSocket(this.url, { - handshakeTimeout: 2000, - perMessageDeflate: false - }); - - console.log("Trying to connect to the websocket server: " + this.url); - - const instance = this; - - this.websocket.on("open", function open() { - console.log("Connected to the websocket server: " + instance.url); - instance.onopen(); - }); - - this.websocket.on("message", function message(data) { - instance.onmessage(data.toString()); - }); - - this.websocket.on("error", console.error); - - this.websocket.on("close", function close() { - console.log("The websocket connection: " + instance.url + " closed, trying reconnecting..."); - instance.onclose(); - - setTimeout(() => { - instance.reconnect(); - }, 3000); // TODO: 重连间隔在配置文件中实现 - }); - } -} \ No newline at end of file diff --git a/src/onebot11/event/manager.ts b/src/onebot11/event/manager.ts deleted file mode 100644 index 93ce191..0000000 --- a/src/onebot11/event/manager.ts +++ /dev/null @@ -1,24 +0,0 @@ -import * as websocket from "ws"; -import {PostMsgType, wsReply} from "../server"; -import {ReconnectingWebsocket} from "../ReconnectingWebsocket"; - -const websocketList = []; - -export function registerEventSender(ws: websocket.WebSocket | ReconnectingWebsocket) { - websocketList.push(ws); -} - -export function unregisterEventSender(ws: websocket.WebSocket | ReconnectingWebsocket) { - let index = websocketList.indexOf(ws); - if (index !== -1) { - websocketList.splice(index, 1); - } -} - -export function callEvent(event: PostMsgType) { - new Promise(() => { - for (const ws of websocketList) { - wsReply(ws, event); - } - }).then() -} \ No newline at end of file diff --git a/src/onebot11/server.ts b/src/onebot11/server.ts deleted file mode 100644 index df87476..0000000 --- a/src/onebot11/server.ts +++ /dev/null @@ -1,199 +0,0 @@ -import * as websocket from "ws"; -import urlParse from "url"; -import {getConfigUtil, log} from "../common/utils"; -import {selfInfo} from "../common/data"; -import {OB11Message} from './types'; -import {actionMap} from "./action"; -import {OB11WebsocketResponse} from "./action/utils"; -import {callEvent, registerEventSender, unregisterEventSender} from "./event/manager"; -import {ReconnectingWebsocket} from "./ReconnectingWebsocket"; -import {ActionName} from "./action/types"; -import {OB11BaseMetaEvent} from "./event/meta/OB11BaseMetaEvent"; -import {OB11BaseNoticeEvent} from "./event/notice/OB11BaseNoticeEvent"; -import BaseAction from "./action/BaseAction"; -import {LifeCycleSubType, OB11LifeCycleEvent} from "./event/meta/OB11LifeCycleEvent"; -import {OB11HeartbeatEvent} from "./event/meta/OB11HeartbeatEvent"; - -let heartbeatRunning = false; -let websocketServer = null; - -export function initWebsocket(port: number) { - const {heartInterval, ob11: {enableWs}, token} = getConfigUtil().getConfig() - if (!heartbeatRunning) { - setInterval(() => { - callEvent(new OB11HeartbeatEvent(true, true, heartInterval)); - }, heartInterval); // 心跳包 - - heartbeatRunning = true; - } - if (enableWs) { - if (websocketServer) { - websocketServer.close((err) => { - log("ws server close failed!", err) - }) - } - - websocketServer = new websocket.Server({port}); - console.log(`llonebot websocket service started 0.0.0.0:${port}`); - - websocketServer.on("connection", (ws, req) => { - const url = req.url.split("?").shift(); - log("receive ws connect", url) - let clientToken: string = "" - const authHeader = req.headers['authorization']; - if (authHeader) { - clientToken = authHeader.split("Bearer ").pop() - log("receive ws header token", clientToken); - } else { - const parsedUrl = urlParse.parse(req.url, true); - const urlToken = parsedUrl.query.access_token; - if (urlToken) { - if (Array.isArray(urlToken)) { - clientToken = urlToken[0] - } else { - clientToken = urlToken - } - log("receive ws url token", clientToken); - } - } - if (token && clientToken != token) { - ws.send(JSON.stringify(OB11WebsocketResponse.res(null, "failed", 1403, "token验证失败"))) - return ws.close() - } - - if (url == "/api" || url == "/api/" || url == "/") { - ws.on("message", async (msg) => { - let receiveData: { action: ActionName, params: any, echo?: string } = {action: null, params: {}} - let echo = "" - log("收到正向Websocket消息", msg.toString()) - try { - receiveData = JSON.parse(msg.toString()) - echo = receiveData.echo - } catch (e) { - return wsReply(ws, OB11WebsocketResponse.error("json解析失败,请检查数据格式", 1400, echo)) - } - const action: BaseAction = actionMap.get(receiveData.action); - if (!action) { - return wsReply(ws, OB11WebsocketResponse.error("不支持的api " + receiveData.action, 1404, echo)) - } - try { - let handleResult = await action.websocketHandle(receiveData.params, echo); - wsReply(ws, handleResult) - } catch (e) { - wsReply(ws, OB11WebsocketResponse.error(`api处理出错:${e}`, 1200, echo)) - } - }) - } - if (url == "/event" || url == "/event/" || url == "/") { - registerEventSender(ws); - - log("event上报ws客户端已连接") - - try { - wsReply(ws, new OB11LifeCycleEvent(LifeCycleSubType.CONNECT)) - } catch (e) { - log("发送生命周期失败", e) - } - - ws.on("close", () => { - log("event上报ws客户端已断开") - unregisterEventSender(ws); - }) - } - }) - } - - initReverseWebsocket(); -} - -function initReverseWebsocket() { - const config = getConfigUtil().getConfig(); - if (config.ob11.enableWsReverse) { - console.log("Prepare to connect all reverse websockets..."); - for (const url of config.ob11.wsHosts) { - new Promise(() => { - try { - let wsClient = new ReconnectingWebsocket(url); - registerEventSender(wsClient); - - wsClient.onopen = function () { - wsReply(wsClient, new OB11LifeCycleEvent(LifeCycleSubType.CONNECT)); - } - - wsClient.onclose = function () { - unregisterEventSender(wsClient); - } - - wsClient.onmessage = async function (msg) { - let receiveData: { action: ActionName, params: any, echo?: string } = {action: null, params: {}} - let echo = "" - log("收到反向Websocket消息", msg.toString()) - try { - receiveData = JSON.parse(msg.toString()) - echo = receiveData.echo - } catch (e) { - return wsReply(wsClient, OB11WebsocketResponse.error("json解析失败,请检查数据格式", 1400, echo)) - } - const action: BaseAction = actionMap.get(receiveData.action); - if (!action) { - return wsReply(wsClient, OB11WebsocketResponse.error("不支持的api " + receiveData.action, 1404, echo)) - } - try { - let handleResult = await action.websocketHandle(receiveData.params, echo); - wsReply(wsClient, handleResult) - } catch (e) { - wsReply(wsClient, OB11WebsocketResponse.error(`api处理出错:${e}`, 1200, echo)) - } - } - } catch (e) { - log(e.stack); - } - }).then(); - } - } -} - -export function wsReply(wsClient: websocket.WebSocket | ReconnectingWebsocket, data: OB11WebsocketResponse | PostMsgType) { - try { - let packet = Object.assign({ - echo: "" - }, data); - if (!packet.echo) { - packet.echo = ""; - } - - wsClient.send(JSON.stringify(packet)) - log("ws 消息上报", data) - } catch (e) { - log("websocket 回复失败", e) - } -} - -export type PostMsgType = OB11Message | OB11BaseMetaEvent | OB11BaseNoticeEvent - -export function postMsg(msg: PostMsgType) { - const config = getConfigUtil().getConfig(); - // 判断msg是否是event - if (!config.reportSelfMessage) { - if ((msg as OB11Message).user_id.toString() == selfInfo.uin) { - return - } - } - for (const host of config.ob11.httpHosts) { - fetch(host, { - method: "POST", - headers: { - "Content-Type": "application/json", - "x-self-id": selfInfo.uin - }, - body: JSON.stringify(msg) - }).then((res: any) => { - log(`新消息事件HTTP上报成功: ${host} ` + JSON.stringify(msg)); - }, (err: any) => { - log(`新消息事件HTTP上报失败: ${host} ` + err + JSON.stringify(msg)); - }); - } - - log("新消息事件ws上报", msg); - callEvent(msg); -} diff --git a/src/onebot11/server/http.ts b/src/onebot11/server/http.ts index b7408a2..066fa4a 100644 --- a/src/onebot11/server/http.ts +++ b/src/onebot11/server/http.ts @@ -1,7 +1,7 @@ import {Response} from "express"; import {getConfigUtil} from "../../common/utils"; import {OB11Response} from "../action/utils"; -import {HttpServerBase} from "../../server/http"; +import {HttpServerBase} from "../../common/server/http"; import {actionHandlers} from "../action"; class OB11HTTPServer extends HttpServerBase { diff --git a/src/onebot11/server/postevent.ts b/src/onebot11/server/postevent.ts new file mode 100644 index 0000000..44ca141 --- /dev/null +++ b/src/onebot11/server/postevent.ts @@ -0,0 +1,57 @@ +import {getConfigUtil, log} from "../../common/utils"; +import {OB11Message} from "../types"; +import {selfInfo} from "../../common/data"; +import {OB11BaseMetaEvent} from "../event/meta/OB11BaseMetaEvent"; +import {OB11BaseNoticeEvent} from "../event/notice/OB11BaseNoticeEvent"; +import * as websocket from "ws"; +import {wsReply} from "./ws/reply"; + +export type PostEventType = OB11Message | OB11BaseMetaEvent | OB11BaseNoticeEvent + +const eventWSList: websocket.WebSocket[] = []; + +export function registerWsEventSender(ws: websocket.WebSocket) { + eventWSList.push(ws); +} + +export function unregisterWsEventSender(ws: websocket.WebSocket) { + let index = eventWSList.indexOf(ws); + if (index !== -1) { + eventWSList.splice(index, 1); + } +} + +export function postWsEvent(event: PostEventType) { + new Promise(() => { + for (const ws of eventWSList) { + wsReply(ws, event); + } + }).then() +} + +export function postEvent(msg: PostEventType) { + const config = getConfigUtil().getConfig(); + // 判断msg是否是event + if (!config.reportSelfMessage) { + if ((msg as OB11Message).user_id.toString() == selfInfo.uin) { + return + } + } + if (config.ob11.enableHttpPost) { + for (const host of config.ob11.httpHosts) { + fetch(host, { + method: "POST", + headers: { + "Content-Type": "application/json", + "x-self-id": selfInfo.uin + }, + body: JSON.stringify(msg) + }).then((res: any) => { + log(`新消息事件HTTP上报成功: ${host} ` + JSON.stringify(msg)); + }, (err: any) => { + log(`新消息事件HTTP上报失败: ${host} ` + err + JSON.stringify(msg)); + }); + } + } + postWsEvent(msg); +} \ No newline at end of file diff --git a/src/onebot11/server/ws/ReverseWebsocket.ts b/src/onebot11/server/ws/ReverseWebsocket.ts new file mode 100644 index 0000000..9c8ee10 --- /dev/null +++ b/src/onebot11/server/ws/ReverseWebsocket.ts @@ -0,0 +1,149 @@ +import {getConfigUtil, log} from "../../../common/utils"; + +import * as WebSocket from "ws"; +import {selfInfo} from "../../../common/data"; +import {LifeCycleSubType, OB11LifeCycleEvent} from "../../event/meta/OB11LifeCycleEvent"; +import {ActionName} from "../../action/types"; +import {OB11WebsocketResponse} from "../../action/utils"; +import BaseAction from "../../action/BaseAction"; +import {actionMap} from "../../action"; +import {registerWsEventSender, unregisterWsEventSender} from "../postevent"; +import {wsReply} from "./reply"; + +export let rwsList: ReverseWebsocket[] = []; + +export class ReverseWebsocket { + public websocket: WebSocket.WebSocket; + public url: string; + private running: boolean = false; + + public constructor(url: string) { + this.url = url; + this.running = true; + this.connect(); + } + public stop(){ + this.running = false; + unregisterWsEventSender(this.websocket); + this.websocket.close(); + } + + public onopen = function () { + } + + public onmessage = function (msg: string) { + } + + public onclose = function () { + unregisterWsEventSender(this.websocket); + } + + public send(msg: string) { + if (this.websocket && this.websocket.readyState == WebSocket.OPEN) { + this.websocket.send(msg); + } + } + + private reconnect() { + setTimeout(() => { + this.connect(); + }, 3000); // TODO: 重连间隔在配置文件中实现 + } + + private connect() { + const {token} = getConfigUtil().getConfig() + this.websocket = new WebSocket.WebSocket(this.url, { + handshakeTimeout: 2000, + perMessageDeflate: false, + headers: { + 'X-Self-ID': selfInfo.uin, + 'Authorization': `Bearer ${token}` + } + }); + log("Trying to connect to the websocket server: " + this.url); + + const instance = this; + + this.websocket.on("open", function open() { + log("Connected to the websocket server: " + instance.url); + instance.onopen(); + }); + + this.websocket.on("message", function message(data) { + instance.onmessage(data.toString()); + }); + + this.websocket.on("error", log); + + this.websocket.on("close", function close() { + log("The websocket connection: " + instance.url + " closed, trying reconnecting..."); + instance.onclose(); + + if (instance.running) { + instance.reconnect(); + } + }); + } +} + +class OB11ReverseWebsockets { + start() { + for (const url of getConfigUtil().getConfig().ob11.wsHosts) { + log("开始连接反向ws", url) + new Promise(() => { + try { + let rwsClient = new ReverseWebsocket(url); + rwsList.push(rwsClient); + registerWsEventSender(rwsClient.websocket); + + rwsClient.onopen = function () { + wsReply(rwsClient.websocket, new OB11LifeCycleEvent(LifeCycleSubType.CONNECT)); + } + + rwsClient.onclose = function () { + log("反向ws断开", url); + unregisterWsEventSender(rwsClient.websocket); + } + + rwsClient.onmessage = async function (msg) { + let receiveData: { action: ActionName, params: any, echo?: string } = {action: null, params: {}} + let echo = "" + log("收到反向Websocket消息", msg.toString()) + try { + receiveData = JSON.parse(msg.toString()) + echo = receiveData.echo + } catch (e) { + return wsReply(rwsClient.websocket, OB11WebsocketResponse.error("json解析失败,请检查数据格式", 1400, echo)) + } + const action: BaseAction = actionMap.get(receiveData.action); + if (!action) { + return wsReply(rwsClient.websocket, OB11WebsocketResponse.error("不支持的api " + receiveData.action, 1404, echo)) + } + try { + let handleResult = await action.websocketHandle(receiveData.params, echo); + wsReply(rwsClient.websocket, handleResult) + } catch (e) { + wsReply(rwsClient.websocket, OB11WebsocketResponse.error(`api处理出错:${e}`, 1200, echo)) + } + } + } catch (e) { + log(e.stack); + } + }).then(); + } + } + + stop() { + for(let rws of rwsList){ + rws.stop(); + } + } + + restart() { + this.stop(); + this.start(); + } +} + +export const ob11ReverseWebsockets = new OB11ReverseWebsockets(); + diff --git a/src/onebot11/server/ws/WebsocketServer.ts b/src/onebot11/server/ws/WebsocketServer.ts new file mode 100644 index 0000000..e1e2649 --- /dev/null +++ b/src/onebot11/server/ws/WebsocketServer.ts @@ -0,0 +1,73 @@ +import {WebSocket} from "ws"; +import {getConfigUtil, log} from "../../../common/utils"; +import {actionMap} from "../../action"; +import {OB11WebsocketResponse} from "../../action/utils"; +import {postWsEvent, registerWsEventSender, unregisterWsEventSender} from "../postevent"; +import {ActionName} from "../../action/types"; +import BaseAction from "../../action/BaseAction"; +import {LifeCycleSubType, OB11LifeCycleEvent} from "../../event/meta/OB11LifeCycleEvent"; +import {OB11HeartbeatEvent} from "../../event/meta/OB11HeartbeatEvent"; +import {WebsocketServerBase} from "../../../common/server/websocket"; +import {IncomingMessage} from "node:http"; +import {wsReply} from "./reply"; + +let heartbeatRunning = false; + +class OB11WebsocketServer extends WebsocketServerBase { + authorizeFailed(wsClient: WebSocket) { + wsClient.send(JSON.stringify(OB11WebsocketResponse.res(null, "failed", 1403, "token验证失败"))) + } + + async handleAction(wsClient: WebSocket, actionName: string, params: any, echo?: string) { + const action: BaseAction = actionMap.get(actionName); + if (!action) { + return wsReply(wsClient, OB11WebsocketResponse.error("不支持的api " + actionName, 1404, echo)) + } + try { + let handleResult = await action.websocketHandle(params, echo); + wsReply(wsClient, handleResult) + } catch (e) { + wsReply(wsClient, OB11WebsocketResponse.error(`api处理出错:${e}`, 1200, echo)) + } + } + + onConnect(wsClient: WebSocket, url: string, req: IncomingMessage) { + if (url == "/api" || url == "/api/" || url == "/") { + wsClient.on("message", async (msg) => { + let receiveData: { action: ActionName, params: any, echo?: string } = {action: null, params: {}} + let echo = "" + log("收到正向Websocket消息", msg.toString()) + try { + receiveData = JSON.parse(msg.toString()) + echo = receiveData.echo + } catch (e) { + return wsReply(wsClient, OB11WebsocketResponse.error("json解析失败,请检查数据格式", 1400, echo)) + } + this.handleAction(wsClient, receiveData.action, receiveData.params, receiveData.echo).then() + }) + } + if (url == "/event" || url == "/event/" || url == "/") { + registerWsEventSender(wsClient); + + log("event上报ws客户端已连接") + + try { + wsReply(wsClient, new OB11LifeCycleEvent(LifeCycleSubType.CONNECT)) + } catch (e) { + log("发送生命周期失败", e) + } + const {heartInterval} = getConfigUtil().getConfig(); + const wsClientInterval = setInterval(() => { + postWsEvent(new OB11HeartbeatEvent(true, true, heartInterval)); + }, heartInterval); // 心跳包 + wsClient.on("close", () => { + log("event上报ws客户端已断开") + clearInterval(wsClientInterval); + unregisterWsEventSender(wsClient); + }) + } + } +} + +export const ob11WebsocketServer = new OB11WebsocketServer() + diff --git a/src/onebot11/server/ws/reply.ts b/src/onebot11/server/ws/reply.ts new file mode 100644 index 0000000..66f3f7e --- /dev/null +++ b/src/onebot11/server/ws/reply.ts @@ -0,0 +1,19 @@ +import * as websocket from "ws"; +import {OB11WebsocketResponse} from "../../action/utils"; +import {PostEventType} from "../postevent"; +import {log} from "../../../common/utils"; + +export function wsReply(wsClient: websocket.WebSocket, data: OB11WebsocketResponse | PostEventType) { + try { + let packet = Object.assign({ + echo: "" + }, data); + if (!packet.echo) { + packet.echo = ""; + } + wsClient.send(JSON.stringify(packet)) + log("ws 消息上报", data) + } catch (e) { + log("websocket 回复失败", e) + } +} \ No newline at end of file diff --git a/src/renderer.ts b/src/renderer.ts index 312bc23..b1a01fa 100644 --- a/src/renderer.ts +++ b/src/renderer.ts @@ -16,7 +16,7 @@ async function onSettingWindowCreated(view: Element) {

HTTP事件上报地址(http)

+ placeholder="如:http://127.0.0.1:8080/onebot/v11/http"/> ` return eleStr @@ -25,10 +25,10 @@ async function onSettingWindowCreated(view: Element) { function createWsHostEleStr(host: string) { let eleStr = ` -

事件上报地址(反向websocket)

+

反向websocket地址:

+ placeholder="如: ws://127.0.0.1:5410/onebot"/>
` return eleStr @@ -92,7 +92,7 @@ async function onSettingWindowCreated(view: Element) {
- +
${wsHostsEleStr} diff --git a/src/server/base.ts b/src/server/base.ts deleted file mode 100644 index 7291840..0000000 --- a/src/server/base.ts +++ /dev/null @@ -1,6 +0,0 @@ - - -export abstract class ServerBase{ - abstract start: () => void - abstract restart: ()=>void -} \ No newline at end of file