mirror of
https://github.com/LLOneBot/LLOneBot.git
synced 2024-11-22 01:56:33 +00:00
fix: reverse ws restart
This commit is contained in:
parent
f092fad2f4
commit
103e0b43f8
@ -1,5 +1,5 @@
|
||||
import express, {Express, Request, Response} from "express";
|
||||
import {getConfigUtil, log} from "../common/utils";
|
||||
import {getConfigUtil, log} from "../utils";
|
||||
import http from "http";
|
||||
|
||||
const JSONbig = require('json-bigint')({storeAsString: true});
|
95
src/common/server/websocket.ts
Normal file
95
src/common/server/websocket.ts
Normal file
@ -0,0 +1,95 @@
|
||||
import {Server, WebSocket} from "ws";
|
||||
import {getConfigUtil, log} from "../utils";
|
||||
import urlParse from "url";
|
||||
import {IncomingMessage} from "node:http";
|
||||
|
||||
class WebsocketClientBase {
|
||||
private wsClient: WebSocket
|
||||
|
||||
constructor() {
|
||||
}
|
||||
|
||||
send(msg: string) {
|
||||
if (this.wsClient && this.wsClient.readyState == WebSocket.OPEN) {
|
||||
this.wsClient.send(msg);
|
||||
}
|
||||
}
|
||||
|
||||
onMessage(msg: string){
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
export class WebsocketServerBase {
|
||||
private ws: Server = null;
|
||||
|
||||
constructor() {
|
||||
console.log(`llonebot websocket service started`)
|
||||
}
|
||||
|
||||
start(port: number) {
|
||||
this.ws = new Server({port});
|
||||
this.ws.on("connection", (wsClient, req)=>{
|
||||
const url = req.url.split("?").shift()
|
||||
this.authorize(wsClient, req);
|
||||
this.onConnect(wsClient, url, req);
|
||||
wsClient.on("message", async (msg)=>{
|
||||
this.onMessage(wsClient, url, msg.toString())
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
stop() {
|
||||
this.ws.close((err) => {
|
||||
log("ws server close failed!", err)
|
||||
});
|
||||
this.ws = null;
|
||||
}
|
||||
restart(port: number){
|
||||
this.stop();
|
||||
this.start(port);
|
||||
}
|
||||
|
||||
authorize(wsClient: WebSocket, req) {
|
||||
let token = getConfigUtil().getConfig().token;
|
||||
const url = req.url.split("?").shift();
|
||||
log("ws connect", url)
|
||||
let clientToken: string = ""
|
||||
const authHeader = req.headers['authorization'];
|
||||
if (authHeader) {
|
||||
clientToken = authHeader.split("Bearer ").pop()
|
||||
log("receive ws header token", clientToken);
|
||||
} else {
|
||||
const parsedUrl = urlParse.parse(req.url, true);
|
||||
const urlToken = parsedUrl.query.access_token;
|
||||
if (urlToken) {
|
||||
if (Array.isArray(urlToken)) {
|
||||
clientToken = urlToken[0]
|
||||
} else {
|
||||
clientToken = urlToken
|
||||
}
|
||||
log("receive ws url token", clientToken);
|
||||
}
|
||||
}
|
||||
if (token && clientToken != token) {
|
||||
this.authorizeFailed(wsClient)
|
||||
return wsClient.close()
|
||||
}
|
||||
}
|
||||
|
||||
authorizeFailed(wsClient: WebSocket) {
|
||||
|
||||
}
|
||||
|
||||
onConnect(wsClient: WebSocket, url: string, req: IncomingMessage) {
|
||||
|
||||
}
|
||||
|
||||
onMessage(wsClient: WebSocket, url: string, msg: string) {
|
||||
|
||||
}
|
||||
|
||||
sendHeart() {
|
||||
|
||||
}
|
||||
}
|
@ -4,7 +4,7 @@ import {BrowserWindow, ipcMain} from 'electron';
|
||||
import fs from 'fs';
|
||||
import {Config} from "../common/types";
|
||||
import {CHANNEL_GET_CONFIG, CHANNEL_LOG, CHANNEL_SET_CONFIG,} from "../common/channels";
|
||||
import {initWebsocket, postMsg} from "../onebot11/server";
|
||||
import {ob11WebsocketServer} from "../onebot11/server/ws/WebsocketServer";
|
||||
import {CONFIG_DIR, getConfigUtil, log} from "../common/utils";
|
||||
import {addHistoryMsg, getGroupMember, msgHistory, selfInfo} from "../common/data";
|
||||
import {hookNTQQApiCall, hookNTQQApiReceive, ReceiveCmd, registerReceiveHook} from "../ntqqapi/hook";
|
||||
@ -14,6 +14,8 @@ import {ChatType, RawMessage} from "../ntqqapi/types";
|
||||
import {ob11HTTPServer} from "../onebot11/server/http";
|
||||
import {OB11FriendRecallNoticeEvent} from "../onebot11/event/notice/OB11FriendRecallNoticeEvent";
|
||||
import {OB11GroupRecallNoticeEvent} from "../onebot11/event/notice/OB11GroupRecallNoticeEvent";
|
||||
import {postEvent} from "../onebot11/server/postevent";
|
||||
import {ob11ReverseWebsockets} from "../onebot11/server/ws/ReverseWebsocket";
|
||||
|
||||
|
||||
let running = false;
|
||||
@ -34,13 +36,44 @@ function onLoad() {
|
||||
if (arg.ob11.httpPort != oldConfig.ob11.httpPort && arg.ob11.enableHttp) {
|
||||
ob11HTTPServer.restart(arg.ob11.httpPort);
|
||||
}
|
||||
// 判断是否启用或关闭HTTP服务
|
||||
if (!arg.ob11.enableHttp) {
|
||||
ob11HTTPServer.stop()
|
||||
ob11HTTPServer.stop();
|
||||
} else {
|
||||
ob11HTTPServer.start(arg.ob11.httpPort);
|
||||
}
|
||||
// 正向ws端口变化,重启服务
|
||||
if (arg.ob11.wsPort != oldConfig.ob11.wsPort) {
|
||||
initWebsocket(arg.ob11.wsPort)
|
||||
ob11WebsocketServer.restart(arg.ob11.wsPort);
|
||||
}
|
||||
// 判断是否启用或关闭正向ws
|
||||
if (arg.ob11.enableWs != oldConfig.ob11.enableWs) {
|
||||
if (arg.ob11.enableWs) {
|
||||
ob11WebsocketServer.start(arg.ob11.wsPort);
|
||||
} else {
|
||||
ob11WebsocketServer.stop();
|
||||
}
|
||||
}
|
||||
// 判断是否启用或关闭反向ws
|
||||
if (arg.ob11.enableWsReverse != oldConfig.ob11.enableWsReverse) {
|
||||
if (arg.ob11.enableWsReverse) {
|
||||
ob11ReverseWebsockets.start();
|
||||
} else {
|
||||
ob11ReverseWebsockets.stop();
|
||||
}
|
||||
}
|
||||
if (arg.ob11.enableWsReverse) {
|
||||
// 判断反向ws地址有变化
|
||||
if (arg.ob11.wsHosts.length != oldConfig.ob11.wsHosts.length) {
|
||||
ob11ReverseWebsockets.restart();
|
||||
} else {
|
||||
for (const newHost of arg.ob11.wsHosts) {
|
||||
if (!oldConfig.ob11.wsHosts.includes(newHost)) {
|
||||
ob11ReverseWebsockets.restart();
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
})
|
||||
|
||||
@ -64,7 +97,7 @@ function onLoad() {
|
||||
if (msg.user_id.toString() == selfInfo.uin && !reportSelfMessage) {
|
||||
return
|
||||
}
|
||||
postMsg(msg);
|
||||
postEvent(msg);
|
||||
// log("post msg", msg)
|
||||
}).catch(e => log("constructMessage error: ", e.toString()));
|
||||
}
|
||||
@ -90,7 +123,7 @@ function onLoad() {
|
||||
}
|
||||
if (message.chatType == ChatType.friend) {
|
||||
const friendRecallEvent = new OB11FriendRecallNoticeEvent(parseInt(message.senderUin), oriMessage.msgShortId);
|
||||
postMsg(friendRecallEvent);
|
||||
postEvent(friendRecallEvent);
|
||||
} else if (message.chatType == ChatType.group) {
|
||||
let operatorId = message.senderUin
|
||||
for (const element of message.elements) {
|
||||
@ -105,7 +138,7 @@ function onLoad() {
|
||||
oriMessage.msgShortId
|
||||
)
|
||||
|
||||
postMsg(groupRecallEvent);
|
||||
postEvent(groupRecallEvent);
|
||||
}
|
||||
continue
|
||||
}
|
||||
@ -126,12 +159,20 @@ function onLoad() {
|
||||
})
|
||||
NTQQApi.getGroups(true).then()
|
||||
const config = getConfigUtil().getConfig()
|
||||
if (config.ob11.enableHttp) {
|
||||
try {
|
||||
ob11HTTPServer.start(config.ob11.httpPort)
|
||||
initWebsocket(config.ob11.wsPort);
|
||||
} catch (e) {
|
||||
console.log("start failed", e)
|
||||
log("http server start failed", e);
|
||||
}
|
||||
}
|
||||
if (config.ob11.enableWs){
|
||||
ob11WebsocketServer.start(config.ob11.wsPort);
|
||||
}
|
||||
if (config.ob11.enableWsReverse){
|
||||
ob11ReverseWebsockets.start();
|
||||
}
|
||||
|
||||
log("LLOneBot start")
|
||||
}
|
||||
|
||||
|
@ -5,8 +5,8 @@ import {Group, RawMessage, User} from "./types";
|
||||
import {addHistoryMsg, friends, groups, msgHistory} from "../common/data";
|
||||
import {OB11GroupDecreaseEvent} from "../onebot11/event/notice/OB11GroupDecreaseEvent";
|
||||
import {OB11GroupIncreaseEvent} from "../onebot11/event/notice/OB11GroupIncreaseEvent";
|
||||
import {postMsg} from "../onebot11/server";
|
||||
import {v4 as uuidv4} from "uuid"
|
||||
import {postEvent} from "../onebot11/server/postevent";
|
||||
|
||||
export let hookApiCallbacks: Record<string, (apiReturn: any) => void> = {}
|
||||
|
||||
@ -155,7 +155,7 @@ async function processGroupEvent(payload) {
|
||||
|
||||
for (const member of oldMembers) {
|
||||
if (!newMembersSet.has(member.uin)) {
|
||||
postMsg(new OB11GroupDecreaseEvent(group.groupCode, parseInt(member.uin)));
|
||||
postEvent(new OB11GroupDecreaseEvent(group.groupCode, parseInt(member.uin)));
|
||||
break;
|
||||
}
|
||||
}
|
||||
@ -174,7 +174,7 @@ async function processGroupEvent(payload) {
|
||||
group.members = newMembers;
|
||||
for (const member of newMembers) {
|
||||
if (!oldMembersSet.has(member.uin)) {
|
||||
postMsg(new OB11GroupIncreaseEvent(group.groupCode, parseInt(member.uin)));
|
||||
postEvent(new OB11GroupIncreaseEvent(group.groupCode, parseInt(member.uin)));
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
@ -1,56 +0,0 @@
|
||||
import {log} from "../common/utils";
|
||||
|
||||
const WebSocket = require("ws");
|
||||
|
||||
export class ReconnectingWebsocket {
|
||||
private websocket;
|
||||
private readonly url: string;
|
||||
|
||||
public constructor(url: string) {
|
||||
this.url = url;
|
||||
this.reconnect()
|
||||
}
|
||||
|
||||
public onopen = function (){}
|
||||
|
||||
public onmessage = function (msg){}
|
||||
|
||||
public onclose = function () {}
|
||||
|
||||
public send(msg) {
|
||||
if (this.websocket && this.websocket.readyState == WebSocket.OPEN) {
|
||||
this.websocket.send(msg);
|
||||
}
|
||||
}
|
||||
|
||||
private reconnect() {
|
||||
this.websocket = new WebSocket(this.url, {
|
||||
handshakeTimeout: 2000,
|
||||
perMessageDeflate: false
|
||||
});
|
||||
|
||||
console.log("Trying to connect to the websocket server: " + this.url);
|
||||
|
||||
const instance = this;
|
||||
|
||||
this.websocket.on("open", function open() {
|
||||
console.log("Connected to the websocket server: " + instance.url);
|
||||
instance.onopen();
|
||||
});
|
||||
|
||||
this.websocket.on("message", function message(data) {
|
||||
instance.onmessage(data.toString());
|
||||
});
|
||||
|
||||
this.websocket.on("error", console.error);
|
||||
|
||||
this.websocket.on("close", function close() {
|
||||
console.log("The websocket connection: " + instance.url + " closed, trying reconnecting...");
|
||||
instance.onclose();
|
||||
|
||||
setTimeout(() => {
|
||||
instance.reconnect();
|
||||
}, 3000); // TODO: 重连间隔在配置文件中实现
|
||||
});
|
||||
}
|
||||
}
|
@ -1,24 +0,0 @@
|
||||
import * as websocket from "ws";
|
||||
import {PostMsgType, wsReply} from "../server";
|
||||
import {ReconnectingWebsocket} from "../ReconnectingWebsocket";
|
||||
|
||||
const websocketList = [];
|
||||
|
||||
export function registerEventSender(ws: websocket.WebSocket | ReconnectingWebsocket) {
|
||||
websocketList.push(ws);
|
||||
}
|
||||
|
||||
export function unregisterEventSender(ws: websocket.WebSocket | ReconnectingWebsocket) {
|
||||
let index = websocketList.indexOf(ws);
|
||||
if (index !== -1) {
|
||||
websocketList.splice(index, 1);
|
||||
}
|
||||
}
|
||||
|
||||
export function callEvent(event: PostMsgType) {
|
||||
new Promise(() => {
|
||||
for (const ws of websocketList) {
|
||||
wsReply(ws, event);
|
||||
}
|
||||
}).then()
|
||||
}
|
@ -1,199 +0,0 @@
|
||||
import * as websocket from "ws";
|
||||
import urlParse from "url";
|
||||
import {getConfigUtil, log} from "../common/utils";
|
||||
import {selfInfo} from "../common/data";
|
||||
import {OB11Message} from './types';
|
||||
import {actionMap} from "./action";
|
||||
import {OB11WebsocketResponse} from "./action/utils";
|
||||
import {callEvent, registerEventSender, unregisterEventSender} from "./event/manager";
|
||||
import {ReconnectingWebsocket} from "./ReconnectingWebsocket";
|
||||
import {ActionName} from "./action/types";
|
||||
import {OB11BaseMetaEvent} from "./event/meta/OB11BaseMetaEvent";
|
||||
import {OB11BaseNoticeEvent} from "./event/notice/OB11BaseNoticeEvent";
|
||||
import BaseAction from "./action/BaseAction";
|
||||
import {LifeCycleSubType, OB11LifeCycleEvent} from "./event/meta/OB11LifeCycleEvent";
|
||||
import {OB11HeartbeatEvent} from "./event/meta/OB11HeartbeatEvent";
|
||||
|
||||
let heartbeatRunning = false;
|
||||
let websocketServer = null;
|
||||
|
||||
export function initWebsocket(port: number) {
|
||||
const {heartInterval, ob11: {enableWs}, token} = getConfigUtil().getConfig()
|
||||
if (!heartbeatRunning) {
|
||||
setInterval(() => {
|
||||
callEvent(new OB11HeartbeatEvent(true, true, heartInterval));
|
||||
}, heartInterval); // 心跳包
|
||||
|
||||
heartbeatRunning = true;
|
||||
}
|
||||
if (enableWs) {
|
||||
if (websocketServer) {
|
||||
websocketServer.close((err) => {
|
||||
log("ws server close failed!", err)
|
||||
})
|
||||
}
|
||||
|
||||
websocketServer = new websocket.Server({port});
|
||||
console.log(`llonebot websocket service started 0.0.0.0:${port}`);
|
||||
|
||||
websocketServer.on("connection", (ws, req) => {
|
||||
const url = req.url.split("?").shift();
|
||||
log("receive ws connect", url)
|
||||
let clientToken: string = ""
|
||||
const authHeader = req.headers['authorization'];
|
||||
if (authHeader) {
|
||||
clientToken = authHeader.split("Bearer ").pop()
|
||||
log("receive ws header token", clientToken);
|
||||
} else {
|
||||
const parsedUrl = urlParse.parse(req.url, true);
|
||||
const urlToken = parsedUrl.query.access_token;
|
||||
if (urlToken) {
|
||||
if (Array.isArray(urlToken)) {
|
||||
clientToken = urlToken[0]
|
||||
} else {
|
||||
clientToken = urlToken
|
||||
}
|
||||
log("receive ws url token", clientToken);
|
||||
}
|
||||
}
|
||||
if (token && clientToken != token) {
|
||||
ws.send(JSON.stringify(OB11WebsocketResponse.res(null, "failed", 1403, "token验证失败")))
|
||||
return ws.close()
|
||||
}
|
||||
|
||||
if (url == "/api" || url == "/api/" || url == "/") {
|
||||
ws.on("message", async (msg) => {
|
||||
let receiveData: { action: ActionName, params: any, echo?: string } = {action: null, params: {}}
|
||||
let echo = ""
|
||||
log("收到正向Websocket消息", msg.toString())
|
||||
try {
|
||||
receiveData = JSON.parse(msg.toString())
|
||||
echo = receiveData.echo
|
||||
} catch (e) {
|
||||
return wsReply(ws, OB11WebsocketResponse.error("json解析失败,请检查数据格式", 1400, echo))
|
||||
}
|
||||
const action: BaseAction<any, any> = actionMap.get(receiveData.action);
|
||||
if (!action) {
|
||||
return wsReply(ws, OB11WebsocketResponse.error("不支持的api " + receiveData.action, 1404, echo))
|
||||
}
|
||||
try {
|
||||
let handleResult = await action.websocketHandle(receiveData.params, echo);
|
||||
wsReply(ws, handleResult)
|
||||
} catch (e) {
|
||||
wsReply(ws, OB11WebsocketResponse.error(`api处理出错:${e}`, 1200, echo))
|
||||
}
|
||||
})
|
||||
}
|
||||
if (url == "/event" || url == "/event/" || url == "/") {
|
||||
registerEventSender(ws);
|
||||
|
||||
log("event上报ws客户端已连接")
|
||||
|
||||
try {
|
||||
wsReply(ws, new OB11LifeCycleEvent(LifeCycleSubType.CONNECT))
|
||||
} catch (e) {
|
||||
log("发送生命周期失败", e)
|
||||
}
|
||||
|
||||
ws.on("close", () => {
|
||||
log("event上报ws客户端已断开")
|
||||
unregisterEventSender(ws);
|
||||
})
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
initReverseWebsocket();
|
||||
}
|
||||
|
||||
function initReverseWebsocket() {
|
||||
const config = getConfigUtil().getConfig();
|
||||
if (config.ob11.enableWsReverse) {
|
||||
console.log("Prepare to connect all reverse websockets...");
|
||||
for (const url of config.ob11.wsHosts) {
|
||||
new Promise(() => {
|
||||
try {
|
||||
let wsClient = new ReconnectingWebsocket(url);
|
||||
registerEventSender(wsClient);
|
||||
|
||||
wsClient.onopen = function () {
|
||||
wsReply(wsClient, new OB11LifeCycleEvent(LifeCycleSubType.CONNECT));
|
||||
}
|
||||
|
||||
wsClient.onclose = function () {
|
||||
unregisterEventSender(wsClient);
|
||||
}
|
||||
|
||||
wsClient.onmessage = async function (msg) {
|
||||
let receiveData: { action: ActionName, params: any, echo?: string } = {action: null, params: {}}
|
||||
let echo = ""
|
||||
log("收到反向Websocket消息", msg.toString())
|
||||
try {
|
||||
receiveData = JSON.parse(msg.toString())
|
||||
echo = receiveData.echo
|
||||
} catch (e) {
|
||||
return wsReply(wsClient, OB11WebsocketResponse.error("json解析失败,请检查数据格式", 1400, echo))
|
||||
}
|
||||
const action: BaseAction<any, any> = actionMap.get(receiveData.action);
|
||||
if (!action) {
|
||||
return wsReply(wsClient, OB11WebsocketResponse.error("不支持的api " + receiveData.action, 1404, echo))
|
||||
}
|
||||
try {
|
||||
let handleResult = await action.websocketHandle(receiveData.params, echo);
|
||||
wsReply(wsClient, handleResult)
|
||||
} catch (e) {
|
||||
wsReply(wsClient, OB11WebsocketResponse.error(`api处理出错:${e}`, 1200, echo))
|
||||
}
|
||||
}
|
||||
} catch (e) {
|
||||
log(e.stack);
|
||||
}
|
||||
}).then();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
export function wsReply(wsClient: websocket.WebSocket | ReconnectingWebsocket, data: OB11WebsocketResponse | PostMsgType) {
|
||||
try {
|
||||
let packet = Object.assign({
|
||||
echo: ""
|
||||
}, data);
|
||||
if (!packet.echo) {
|
||||
packet.echo = "";
|
||||
}
|
||||
|
||||
wsClient.send(JSON.stringify(packet))
|
||||
log("ws 消息上报", data)
|
||||
} catch (e) {
|
||||
log("websocket 回复失败", e)
|
||||
}
|
||||
}
|
||||
|
||||
export type PostMsgType = OB11Message | OB11BaseMetaEvent | OB11BaseNoticeEvent
|
||||
|
||||
export function postMsg(msg: PostMsgType) {
|
||||
const config = getConfigUtil().getConfig();
|
||||
// 判断msg是否是event
|
||||
if (!config.reportSelfMessage) {
|
||||
if ((msg as OB11Message).user_id.toString() == selfInfo.uin) {
|
||||
return
|
||||
}
|
||||
}
|
||||
for (const host of config.ob11.httpHosts) {
|
||||
fetch(host, {
|
||||
method: "POST",
|
||||
headers: {
|
||||
"Content-Type": "application/json",
|
||||
"x-self-id": selfInfo.uin
|
||||
},
|
||||
body: JSON.stringify(msg)
|
||||
}).then((res: any) => {
|
||||
log(`新消息事件HTTP上报成功: ${host} ` + JSON.stringify(msg));
|
||||
}, (err: any) => {
|
||||
log(`新消息事件HTTP上报失败: ${host} ` + err + JSON.stringify(msg));
|
||||
});
|
||||
}
|
||||
|
||||
log("新消息事件ws上报", msg);
|
||||
callEvent(msg);
|
||||
}
|
@ -1,7 +1,7 @@
|
||||
import {Response} from "express";
|
||||
import {getConfigUtil} from "../../common/utils";
|
||||
import {OB11Response} from "../action/utils";
|
||||
import {HttpServerBase} from "../../server/http";
|
||||
import {HttpServerBase} from "../../common/server/http";
|
||||
import {actionHandlers} from "../action";
|
||||
|
||||
class OB11HTTPServer extends HttpServerBase {
|
||||
|
57
src/onebot11/server/postevent.ts
Normal file
57
src/onebot11/server/postevent.ts
Normal file
@ -0,0 +1,57 @@
|
||||
import {getConfigUtil, log} from "../../common/utils";
|
||||
import {OB11Message} from "../types";
|
||||
import {selfInfo} from "../../common/data";
|
||||
import {OB11BaseMetaEvent} from "../event/meta/OB11BaseMetaEvent";
|
||||
import {OB11BaseNoticeEvent} from "../event/notice/OB11BaseNoticeEvent";
|
||||
import * as websocket from "ws";
|
||||
import {wsReply} from "./ws/reply";
|
||||
|
||||
export type PostEventType = OB11Message | OB11BaseMetaEvent | OB11BaseNoticeEvent
|
||||
|
||||
const eventWSList: websocket.WebSocket[] = [];
|
||||
|
||||
export function registerWsEventSender(ws: websocket.WebSocket) {
|
||||
eventWSList.push(ws);
|
||||
}
|
||||
|
||||
export function unregisterWsEventSender(ws: websocket.WebSocket) {
|
||||
let index = eventWSList.indexOf(ws);
|
||||
if (index !== -1) {
|
||||
eventWSList.splice(index, 1);
|
||||
}
|
||||
}
|
||||
|
||||
export function postWsEvent(event: PostEventType) {
|
||||
new Promise(() => {
|
||||
for (const ws of eventWSList) {
|
||||
wsReply(ws, event);
|
||||
}
|
||||
}).then()
|
||||
}
|
||||
|
||||
export function postEvent(msg: PostEventType) {
|
||||
const config = getConfigUtil().getConfig();
|
||||
// 判断msg是否是event
|
||||
if (!config.reportSelfMessage) {
|
||||
if ((msg as OB11Message).user_id.toString() == selfInfo.uin) {
|
||||
return
|
||||
}
|
||||
}
|
||||
if (config.ob11.enableHttpPost) {
|
||||
for (const host of config.ob11.httpHosts) {
|
||||
fetch(host, {
|
||||
method: "POST",
|
||||
headers: {
|
||||
"Content-Type": "application/json",
|
||||
"x-self-id": selfInfo.uin
|
||||
},
|
||||
body: JSON.stringify(msg)
|
||||
}).then((res: any) => {
|
||||
log(`新消息事件HTTP上报成功: ${host} ` + JSON.stringify(msg));
|
||||
}, (err: any) => {
|
||||
log(`新消息事件HTTP上报失败: ${host} ` + err + JSON.stringify(msg));
|
||||
});
|
||||
}
|
||||
}
|
||||
postWsEvent(msg);
|
||||
}
|
149
src/onebot11/server/ws/ReverseWebsocket.ts
Normal file
149
src/onebot11/server/ws/ReverseWebsocket.ts
Normal file
@ -0,0 +1,149 @@
|
||||
import {getConfigUtil, log} from "../../../common/utils";
|
||||
|
||||
import * as WebSocket from "ws";
|
||||
import {selfInfo} from "../../../common/data";
|
||||
import {LifeCycleSubType, OB11LifeCycleEvent} from "../../event/meta/OB11LifeCycleEvent";
|
||||
import {ActionName} from "../../action/types";
|
||||
import {OB11WebsocketResponse} from "../../action/utils";
|
||||
import BaseAction from "../../action/BaseAction";
|
||||
import {actionMap} from "../../action";
|
||||
import {registerWsEventSender, unregisterWsEventSender} from "../postevent";
|
||||
import {wsReply} from "./reply";
|
||||
|
||||
export let rwsList: ReverseWebsocket[] = [];
|
||||
|
||||
export class ReverseWebsocket {
|
||||
public websocket: WebSocket.WebSocket;
|
||||
public url: string;
|
||||
private running: boolean = false;
|
||||
|
||||
public constructor(url: string) {
|
||||
this.url = url;
|
||||
this.running = true;
|
||||
this.connect();
|
||||
}
|
||||
public stop(){
|
||||
this.running = false;
|
||||
unregisterWsEventSender(this.websocket);
|
||||
this.websocket.close();
|
||||
}
|
||||
|
||||
public onopen = function () {
|
||||
}
|
||||
|
||||
public onmessage = function (msg: string) {
|
||||
}
|
||||
|
||||
public onclose = function () {
|
||||
unregisterWsEventSender(this.websocket);
|
||||
}
|
||||
|
||||
public send(msg: string) {
|
||||
if (this.websocket && this.websocket.readyState == WebSocket.OPEN) {
|
||||
this.websocket.send(msg);
|
||||
}
|
||||
}
|
||||
|
||||
private reconnect() {
|
||||
setTimeout(() => {
|
||||
this.connect();
|
||||
}, 3000); // TODO: 重连间隔在配置文件中实现
|
||||
}
|
||||
|
||||
private connect() {
|
||||
const {token} = getConfigUtil().getConfig()
|
||||
this.websocket = new WebSocket.WebSocket(this.url, {
|
||||
handshakeTimeout: 2000,
|
||||
perMessageDeflate: false,
|
||||
headers: {
|
||||
'X-Self-ID': selfInfo.uin,
|
||||
'Authorization': `Bearer ${token}`
|
||||
}
|
||||
});
|
||||
log("Trying to connect to the websocket server: " + this.url);
|
||||
|
||||
const instance = this;
|
||||
|
||||
this.websocket.on("open", function open() {
|
||||
log("Connected to the websocket server: " + instance.url);
|
||||
instance.onopen();
|
||||
});
|
||||
|
||||
this.websocket.on("message", function message(data) {
|
||||
instance.onmessage(data.toString());
|
||||
});
|
||||
|
||||
this.websocket.on("error", log);
|
||||
|
||||
this.websocket.on("close", function close() {
|
||||
log("The websocket connection: " + instance.url + " closed, trying reconnecting...");
|
||||
instance.onclose();
|
||||
|
||||
if (instance.running) {
|
||||
instance.reconnect();
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
class OB11ReverseWebsockets {
|
||||
start() {
|
||||
for (const url of getConfigUtil().getConfig().ob11.wsHosts) {
|
||||
log("开始连接反向ws", url)
|
||||
new Promise(() => {
|
||||
try {
|
||||
let rwsClient = new ReverseWebsocket(url);
|
||||
rwsList.push(rwsClient);
|
||||
registerWsEventSender(rwsClient.websocket);
|
||||
|
||||
rwsClient.onopen = function () {
|
||||
wsReply(rwsClient.websocket, new OB11LifeCycleEvent(LifeCycleSubType.CONNECT));
|
||||
}
|
||||
|
||||
rwsClient.onclose = function () {
|
||||
log("反向ws断开", url);
|
||||
unregisterWsEventSender(rwsClient.websocket);
|
||||
}
|
||||
|
||||
rwsClient.onmessage = async function (msg) {
|
||||
let receiveData: { action: ActionName, params: any, echo?: string } = {action: null, params: {}}
|
||||
let echo = ""
|
||||
log("收到反向Websocket消息", msg.toString())
|
||||
try {
|
||||
receiveData = JSON.parse(msg.toString())
|
||||
echo = receiveData.echo
|
||||
} catch (e) {
|
||||
return wsReply(rwsClient.websocket, OB11WebsocketResponse.error("json解析失败,请检查数据格式", 1400, echo))
|
||||
}
|
||||
const action: BaseAction<any, any> = actionMap.get(receiveData.action);
|
||||
if (!action) {
|
||||
return wsReply(rwsClient.websocket, OB11WebsocketResponse.error("不支持的api " + receiveData.action, 1404, echo))
|
||||
}
|
||||
try {
|
||||
let handleResult = await action.websocketHandle(receiveData.params, echo);
|
||||
wsReply(rwsClient.websocket, handleResult)
|
||||
} catch (e) {
|
||||
wsReply(rwsClient.websocket, OB11WebsocketResponse.error(`api处理出错:${e}`, 1200, echo))
|
||||
}
|
||||
}
|
||||
} catch (e) {
|
||||
log(e.stack);
|
||||
}
|
||||
}).then();
|
||||
}
|
||||
}
|
||||
|
||||
stop() {
|
||||
for(let rws of rwsList){
|
||||
rws.stop();
|
||||
}
|
||||
}
|
||||
|
||||
restart() {
|
||||
this.stop();
|
||||
this.start();
|
||||
}
|
||||
}
|
||||
|
||||
export const ob11ReverseWebsockets = new OB11ReverseWebsockets();
|
||||
|
73
src/onebot11/server/ws/WebsocketServer.ts
Normal file
73
src/onebot11/server/ws/WebsocketServer.ts
Normal file
@ -0,0 +1,73 @@
|
||||
import {WebSocket} from "ws";
|
||||
import {getConfigUtil, log} from "../../../common/utils";
|
||||
import {actionMap} from "../../action";
|
||||
import {OB11WebsocketResponse} from "../../action/utils";
|
||||
import {postWsEvent, registerWsEventSender, unregisterWsEventSender} from "../postevent";
|
||||
import {ActionName} from "../../action/types";
|
||||
import BaseAction from "../../action/BaseAction";
|
||||
import {LifeCycleSubType, OB11LifeCycleEvent} from "../../event/meta/OB11LifeCycleEvent";
|
||||
import {OB11HeartbeatEvent} from "../../event/meta/OB11HeartbeatEvent";
|
||||
import {WebsocketServerBase} from "../../../common/server/websocket";
|
||||
import {IncomingMessage} from "node:http";
|
||||
import {wsReply} from "./reply";
|
||||
|
||||
let heartbeatRunning = false;
|
||||
|
||||
class OB11WebsocketServer extends WebsocketServerBase {
|
||||
authorizeFailed(wsClient: WebSocket) {
|
||||
wsClient.send(JSON.stringify(OB11WebsocketResponse.res(null, "failed", 1403, "token验证失败")))
|
||||
}
|
||||
|
||||
async handleAction(wsClient: WebSocket, actionName: string, params: any, echo?: string) {
|
||||
const action: BaseAction<any, any> = actionMap.get(actionName);
|
||||
if (!action) {
|
||||
return wsReply(wsClient, OB11WebsocketResponse.error("不支持的api " + actionName, 1404, echo))
|
||||
}
|
||||
try {
|
||||
let handleResult = await action.websocketHandle(params, echo);
|
||||
wsReply(wsClient, handleResult)
|
||||
} catch (e) {
|
||||
wsReply(wsClient, OB11WebsocketResponse.error(`api处理出错:${e}`, 1200, echo))
|
||||
}
|
||||
}
|
||||
|
||||
onConnect(wsClient: WebSocket, url: string, req: IncomingMessage) {
|
||||
if (url == "/api" || url == "/api/" || url == "/") {
|
||||
wsClient.on("message", async (msg) => {
|
||||
let receiveData: { action: ActionName, params: any, echo?: string } = {action: null, params: {}}
|
||||
let echo = ""
|
||||
log("收到正向Websocket消息", msg.toString())
|
||||
try {
|
||||
receiveData = JSON.parse(msg.toString())
|
||||
echo = receiveData.echo
|
||||
} catch (e) {
|
||||
return wsReply(wsClient, OB11WebsocketResponse.error("json解析失败,请检查数据格式", 1400, echo))
|
||||
}
|
||||
this.handleAction(wsClient, receiveData.action, receiveData.params, receiveData.echo).then()
|
||||
})
|
||||
}
|
||||
if (url == "/event" || url == "/event/" || url == "/") {
|
||||
registerWsEventSender(wsClient);
|
||||
|
||||
log("event上报ws客户端已连接")
|
||||
|
||||
try {
|
||||
wsReply(wsClient, new OB11LifeCycleEvent(LifeCycleSubType.CONNECT))
|
||||
} catch (e) {
|
||||
log("发送生命周期失败", e)
|
||||
}
|
||||
const {heartInterval} = getConfigUtil().getConfig();
|
||||
const wsClientInterval = setInterval(() => {
|
||||
postWsEvent(new OB11HeartbeatEvent(true, true, heartInterval));
|
||||
}, heartInterval); // 心跳包
|
||||
wsClient.on("close", () => {
|
||||
log("event上报ws客户端已断开")
|
||||
clearInterval(wsClientInterval);
|
||||
unregisterWsEventSender(wsClient);
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
export const ob11WebsocketServer = new OB11WebsocketServer()
|
||||
|
19
src/onebot11/server/ws/reply.ts
Normal file
19
src/onebot11/server/ws/reply.ts
Normal file
@ -0,0 +1,19 @@
|
||||
import * as websocket from "ws";
|
||||
import {OB11WebsocketResponse} from "../../action/utils";
|
||||
import {PostEventType} from "../postevent";
|
||||
import {log} from "../../../common/utils";
|
||||
|
||||
export function wsReply(wsClient: websocket.WebSocket, data: OB11WebsocketResponse | PostEventType) {
|
||||
try {
|
||||
let packet = Object.assign({
|
||||
echo: ""
|
||||
}, data);
|
||||
if (!packet.echo) {
|
||||
packet.echo = "";
|
||||
}
|
||||
wsClient.send(JSON.stringify(packet))
|
||||
log("ws 消息上报", data)
|
||||
} catch (e) {
|
||||
log("websocket 回复失败", e)
|
||||
}
|
||||
}
|
@ -16,7 +16,7 @@ async function onSettingWindowCreated(view: Element) {
|
||||
<h2>HTTP事件上报地址(http)</h2>
|
||||
<input class="httpHost input-text" type="text" value="${host}"
|
||||
style="width:60%;padding: 5px"
|
||||
placeholder="如果localhost上报失败试试局域网ip"/>
|
||||
placeholder="如:http://127.0.0.1:8080/onebot/v11/http"/>
|
||||
</setting-item>
|
||||
`
|
||||
return eleStr
|
||||
@ -25,10 +25,10 @@ async function onSettingWindowCreated(view: Element) {
|
||||
function createWsHostEleStr(host: string) {
|
||||
let eleStr = `
|
||||
<setting-item data-direction="row" class="hostItem vertical-list-item ${reverseWSClass}">
|
||||
<h2>事件上报地址(反向websocket)</h2>
|
||||
<h2>反向websocket地址:</h2>
|
||||
<input class="wsHost input-text" type="text" value="${host}"
|
||||
style="width:60%;padding: 5px"
|
||||
placeholder="如果localhost上报失败试试局域网ip"/>
|
||||
placeholder="如: ws://127.0.0.1:5410/onebot"/>
|
||||
</setting-item>
|
||||
`
|
||||
return eleStr
|
||||
@ -92,7 +92,7 @@ async function onSettingWindowCreated(view: Element) {
|
||||
</setting-item>
|
||||
<div class="${reverseWSClass}" style="display: ${config.ob11.enableWsReverse ? '' : 'none'}">
|
||||
<div>
|
||||
<button id="addWsHost" class="q-button">添加反向Websocket上报地址</button>
|
||||
<button id="addWsHost" class="q-button">添加反向Websocket地址</button>
|
||||
</div>
|
||||
<div id="wsHostItems">
|
||||
${wsHostsEleStr}
|
||||
|
@ -1,6 +0,0 @@
|
||||
|
||||
|
||||
export abstract class ServerBase{
|
||||
abstract start: () => void
|
||||
abstract restart: ()=>void
|
||||
}
|
Loading…
x
Reference in New Issue
Block a user