feat: support reverse websocket

This commit is contained in:
Disy 2024-02-16 22:34:12 +08:00
parent f02b0bdcad
commit 018ec07082
12 changed files with 242 additions and 75 deletions

43
package-lock.json generated
View File

@ -13,12 +13,14 @@
"express": "^4.18.2",
"express-ws": "^5.0.2",
"json-bigint": "^1.0.0",
"uuid": "^9.0.1"
"uuid": "^9.0.1",
"ws": "^8.16.0"
},
"devDependencies": {
"@babel/preset-env": "^7.23.2",
"@types/express": "^4.17.20",
"@types/uuid": "^9.0.8",
"@types/ws": "^8.5.10",
"babel-loader": "^9.1.3",
"ts-loader": "^9.5.0",
"typescript": "^5.2.2",
@ -2200,6 +2202,15 @@
"integrity": "sha512-jg+97EGIcY9AGHJJRaaPVgetKDsrTgbRjQ5Msgjh/DQKEFl0DtyRr/VCOyD1T2R1MNeWPK/u7JoGhlDZnKBAfA==",
"dev": true
},
"node_modules/@types/ws": {
"version": "8.5.10",
"resolved": "https://registry.npmmirror.com/@types/ws/-/ws-8.5.10.tgz",
"integrity": "sha512-vmQSUcfalpIq0R9q7uTo2lXs6eGIpt9wtnLdMv9LVpIjCA/+ufZRozlVoVelIYixx1ugCBKDhn89vnsEGOCx9A==",
"dev": true,
"dependencies": {
"@types/node": "*"
}
},
"node_modules/@webassemblyjs/ast": {
"version": "1.11.6",
"resolved": "https://registry.npmjs.org/@webassemblyjs/ast/-/ast-1.11.6.tgz",
@ -3138,6 +3149,26 @@
"express": "^4.0.0 || ^5.0.0-alpha.1"
}
},
"node_modules/express-ws/node_modules/ws": {
"version": "7.5.9",
"resolved": "https://registry.npmmirror.com/ws/-/ws-7.5.9.tgz",
"integrity": "sha512-F+P9Jil7UiSKSkppIiD94dN07AwvFixvLIj1Og1Rl9GGMuNipJnV9JzjD6XuqmAeiswGvUmNLjr5cFuXwNS77Q==",
"engines": {
"node": ">=8.3.0"
},
"peerDependencies": {
"bufferutil": "^4.0.1",
"utf-8-validate": "^5.0.2"
},
"peerDependenciesMeta": {
"bufferutil": {
"optional": true
},
"utf-8-validate": {
"optional": true
}
}
},
"node_modules/fast-deep-equal": {
"version": "3.1.3",
"resolved": "https://registry.npmjs.org/fast-deep-equal/-/fast-deep-equal-3.1.3.tgz",
@ -4707,15 +4738,15 @@
"dev": true
},
"node_modules/ws": {
"version": "7.5.9",
"resolved": "https://registry.npmmirror.com/ws/-/ws-7.5.9.tgz",
"integrity": "sha512-F+P9Jil7UiSKSkppIiD94dN07AwvFixvLIj1Og1Rl9GGMuNipJnV9JzjD6XuqmAeiswGvUmNLjr5cFuXwNS77Q==",
"version": "8.16.0",
"resolved": "https://registry.npmmirror.com/ws/-/ws-8.16.0.tgz",
"integrity": "sha512-HS0c//TP7Ina87TfiPUz1rQzMhHrl/SG2guqRcTOIUYD2q8uhUdNHZYJUaQ8aTGPzCh+c6oawMKW35nFl1dxyQ==",
"engines": {
"node": ">=8.3.0"
"node": ">=10.0.0"
},
"peerDependencies": {
"bufferutil": "^4.0.1",
"utf-8-validate": "^5.0.2"
"utf-8-validate": ">=5.0.2"
},
"peerDependenciesMeta": {
"bufferutil": {

View File

@ -21,12 +21,14 @@
"express": "^4.18.2",
"express-ws": "^5.0.2",
"json-bigint": "^1.0.0",
"uuid": "^9.0.1"
"uuid": "^9.0.1",
"ws": "^8.16.0"
},
"devDependencies": {
"@babel/preset-env": "^7.23.2",
"@types/express": "^4.17.20",
"@types/uuid": "^9.0.8",
"@types/ws": "^8.5.10",
"babel-loader": "^9.1.3",
"ts-loader": "^9.5.0",
"typescript": "^5.2.2",

View File

@ -49,6 +49,10 @@ export function isGIF(path: string) {
return buffer.toString() === 'GIF8'
}
export function sleep(ms: number): Promise<void> {
return new Promise(resolve => setTimeout(resolve, ms));
}
// 定义一个异步函数来检查文件是否存在
export function checkFileReceived(path: string, timeout: number=3000): Promise<void> {

View File

@ -1,12 +1,15 @@
import {BrowserWindow} from 'electron';
import {log} from "../common/utils";
import {log, sleep} from "../common/utils";
import {NTQQApi, NTQQApiClass, sendMessagePool} from "./ntcall";
import {Group, GroupMember, RawMessage, User} from "./types";
import {addHistoryMsg, friends, groups, msgHistory} from "../common/data";
import {v4 as uuidv4} from 'uuid';
import {callEvent, EventType} from "../onebot11/event";
import {callEvent, EventType} from "../onebot11/event/manager";
import {OB11Message} from "../onebot11/types";
import {OB11Constructor} from "../onebot11/constructor";
import BaseMessageEvent from "../onebot11/event/BaseMessageEvent";
import GroupDecreaseEvent from "../onebot11/event/GroupDecreaseEvent";
import GroupIncreaseEvent from "../onebot11/event/GroupIncreaseEvent";
export let hookApiCallbacks: Record<string, (apiReturn: any) => void> = {}
@ -120,18 +123,10 @@ async function processGroupEvent(payload) {
let existGroup = groups.find(g => g.groupCode == group.groupCode);
if (existGroup) {
if (existGroup.memberCount > group.memberCount) {
console.log("群人数减少力!");
const oldMembers = existGroup.members;
console.log("旧群人员:");
for (const member of oldMembers) {
console.log(member.nick);
}
await sleep(200); // 如果请求QQ API的速度过快通常无法正确拉取到最新的群信息因此这里人为引入一个延时
const newMembers = await NTQQApi.getGroupMembers(group.groupCode);
console.log("新群人员:");
for (const member of newMembers) {
console.log(member.nick);
}
group.members = newMembers;
const newMembersSet = new Set<string>(); // 建立索引降低时间复杂度
@ -142,35 +137,26 @@ async function processGroupEvent(payload) {
for (const member of oldMembers) {
if (!newMembersSet.has(member.uin)) {
console.log("减少的群员是:" + member.uin);
callEvent(new GroupDecreaseEvent(group.groupCode, parseInt(member.uin)));
break;
}
}
}
else if (existGroup.memberCount < group.memberCount) {
console.log("群人数增加力!");
console.log("旧群人员:");
for (const member of existGroup.members) {
console.log(member.nick);
}
const oldMembers = existGroup.members;
const oldMembersSet = new Set<string>();
for (const member of existGroup.members) {
for (const member of oldMembers) {
oldMembersSet.add(member.uin);
}
await sleep(200);
const newMembers = await NTQQApi.getGroupMembers(group.groupCode);
console.log("新群人员:");
for (const member of newMembers) {
console.log(member.nick);
}
group.members = newMembers;
for (const member of newMembers) {
if (!oldMembersSet.has(member.uin)) {
console.log("增加的群员是:" + member.uin);
callEvent(new GroupIncreaseEvent(group.groupCode, parseInt(member.uin)));
break;
}
}
@ -231,13 +217,13 @@ registerReceiveHook<{ msgList: Array<RawMessage> }>(ReceiveCmd.UPDATE_MSG, (payl
registerReceiveHook<{ msgList: Array<RawMessage> }>(ReceiveCmd.NEW_MSG, (payload) => {
for (const message of payload.msgList) {
// log("收到新消息push到历史记录", message)
addHistoryMsg(message)
OB11Constructor.message(message).then(
function (message) {
callEvent<OB11Message>(EventType.MESSAGE, message);
callEvent<OB11Message>(new BaseMessageEvent(), message);
}
);
addHistoryMsg(message)
}
const msgIds = Object.keys(msgHistory);
if (msgIds.length > 30000) {

View File

@ -0,0 +1,64 @@
const WebSocket = require("ws");
class ReconnectingWebsocket {
private websocket;
private readonly url: string;
public constructor(url: string) {
this.url = url;
this.reconnect()
}
public onopen = function (){}
public onmessage = function (msg){}
public onclose = function () {}
private heartbeat() {
clearTimeout(this.websocket.pingTimeout);
this.websocket.pingTimeout = setTimeout(() => {
this.websocket.terminate();
}, 3000);
}
public send(msg) {
if (this.websocket && this.websocket.readyState == WebSocket.OPEN) {
this.websocket.send(msg);
}
}
private reconnect() {
this.websocket = new WebSocket(this.url, {
handshakeTimeout: 2000,
perMessageDeflate: false
});
console.log("Trying to connect to the websocket server: " + this.url);
const instance = this;
this.websocket.on("open", function open() {
console.log("Connected to the websocket server: " + instance.url);
instance.onopen();
});
this.websocket.on("message", function message(data) {
instance.onmessage(data.toString());
});
this.websocket.on("error", console.error);
this.websocket.on("ping", this.heartbeat);
this.websocket.on("close", function close() {
console.log("The websocket connection: " + instance.url + " closed, trying reconnecting...");
instance.onclose();
setTimeout(instance.reconnect, 3000);
});
}
}
export default ReconnectingWebsocket;

View File

@ -13,7 +13,6 @@ import { SendMessageElement } from "../../ntqqapi/types";
import { SendMsgElementConstructor } from "../../ntqqapi/constructor";
import { uri2local } from "../utils";
import { v4 as uuid4 } from 'uuid';
import { log } from "../../common/utils";
import BaseAction from "./BaseAction";
import { ActionName } from "./types";
import * as fs from "fs";

View File

@ -0,0 +1,10 @@
import {selfInfo} from "../../common/data";
import {EventType} from "./manager";
class BaseEvent {
time = new Date().getTime();
self_id = selfInfo.uin;
post_type: EventType;
}
export default BaseEvent;

View File

@ -0,0 +1,8 @@
import BaseEvent from "./BaseEvent";
import {EventType} from "./manager";
class BaseMessageEvent extends BaseEvent {
post_type = EventType.MESSAGE;
}
export default BaseMessageEvent

View File

@ -0,0 +1,20 @@
import BaseEvent from "./BaseEvent";
import {EventType} from "./manager";
class GroupDecreaseEvent extends BaseEvent {
post_type = EventType.NOTICE;
notice_type = "group_decrease";
subtype = "leave"; // TODO: 实现其他几种子类型的识别
group_id: number;
operate_id: number;
user_id: number;
constructor(groupId: number, userId: number) {
super();
this.group_id = groupId;
this.operate_id = userId; // 实际上不应该这么实现,但是现在还没有办法识别用户是被踢出的,还是自己主动退出的
this.user_id = userId;
}
}
export default GroupDecreaseEvent

View File

@ -0,0 +1,21 @@
import BaseEvent from "./BaseEvent";
import {EventType} from "./manager";
class GroupIncreaseEvent extends BaseEvent {
post_type = EventType.NOTICE;
notice_type = "group_increase";
subtype = "approve"; // TODO: 实现其他几种子类型的识别
group_id: number;
operate_id: number;
user_id: number;
constructor(groupId: number, userId: number) {
super();
this.group_id = groupId;
this.operate_id = userId; // 实际上不应该这么实现,但是现在还没有办法识别用户是被邀请的,还是主动加入的
this.user_id = userId;
}
}
export default GroupIncreaseEvent

View File

@ -1,4 +1,5 @@
import {selfInfo} from "../common/data";
import {selfInfo} from "../../common/data";
import BaseEvent from "./BaseEvent";
const websocketList = [];
@ -20,19 +21,12 @@ export function unregisterEventSender(ws) {
}
}
export function callEvent<DataType>(type: EventType, data: DataType) {
const basicEvent = {
time: new Date().getTime(),
self_id: selfInfo.uin,
post_type: type
}
export function callEvent<DataType>(event: BaseEvent, data: DataType = null) {
const assignedEvent = (data == null ? event : Object.assign(event, data));
for (const ws of websocketList) {
ws.send(
JSON.stringify(
Object.assign(basicEvent, data)
)
JSON.stringify(assignedEvent)
);
}
}

View File

@ -11,10 +11,17 @@ import { selfInfo } from "../common/data";
import { OB11Message, OB11Return, OB11MessageData } from './types';
import {actionHandlers, actionMap} from "./actions";
import {OB11Response, OB11WebsocketResponse} from "./actions/utils";
import {registerEventSender, unregisterEventSender} from "./event";
import {registerEventSender, unregisterEventSender} from "./event/manager";
import ReconnectingWebsocket from "./ReconnectingWebsocket";
// @SiberianHusky 2021-08-15
enum WebsocketType {
API,
EVENT,
ALL
}
function checkSendMessage(sendMsgList: OB11MessageData[]) {
function checkUri(uri: string): boolean {
const pattern = /^(file:\/\/|http:\/\/|https:\/\/|base64:\/\/)/;
@ -92,17 +99,29 @@ export function startExpress(port: number) {
export function startWebsocketServer(port: number) {
const config = getConfigUtil().getConfig();
if (config.enableWs) {
expressWs(expressWsApp)
expressWsApp.listen(getConfigUtil().getConfig().wsPort, function () {
console.log(`llonebot websocket service started 0.0.0.0:${port}`);
});
try {
expressWs(expressWsApp)
expressWsApp.listen(getConfigUtil().getConfig().wsPort, function () {
console.log(`llonebot websocket service started 0.0.0.0:${port}`);
});
}
catch (e) {
console.log(e);
}
}
}
export function initWebsocket() {
if (getConfigUtil().getConfig().enableWs) {
expressWsApp.ws("/api", initWebsocketServer);
expressWsApp.ws("/", initWebsocketServer);
expressWsApp.ws("/api", (ws, req) => {
initWebsocketServer(ws, req, WebsocketType.API);
});
expressWsApp.ws("/event", (ws, req) => {
initWebsocketServer(ws, req, WebsocketType.EVENT);
});
expressWsApp.ws("/", (ws, req) => {
initWebsocketServer(ws, req, WebsocketType.ALL);
});
}
initReverseWebsocket();
@ -113,11 +132,12 @@ function initReverseWebsocket() {
if (config.enableWsReverse) {
for (const url of config.wsHosts) {
try {
const wsClient = new WebSocket(url);
let wsClient = new ReconnectingWebsocket(url);
websocketClientConnections.push(wsClient);
registerEventSender(wsClient);
wsClient.onclose = function (ev) {
wsClient.onclose = function () {
console.log("The websocket connection: " + url + " closed, trying reconnecting...");
unregisterEventSender(wsClient);
let index = websocketClientConnections.indexOf(wsClient);
if (index !== -1) {
@ -125,8 +145,8 @@ function initReverseWebsocket() {
}
}
wsClient.onmessage = async function (ev) {
let message = ev.data;
wsClient.onmessage = async function (message) {
console.log(message);
if (typeof message === "string") {
try {
let recv = JSON.parse(message);
@ -147,35 +167,43 @@ function initReverseWebsocket() {
}
}
}
catch (e) {}
catch (e) {
console.log(e);
}
}
}
}
function initWebsocketServer(ws, req) {
registerEventSender(ws);
function initWebsocketServer(ws, req, type: WebsocketType) {
if (type == WebsocketType.EVENT || type == WebsocketType.ALL) {
registerEventSender(ws);
}
ws.on("message", async function (message) {
try {
let recv = JSON.parse(message);
let echo = recv.echo ?? "";
if (type == WebsocketType.API || type == WebsocketType.ALL) {
try {
let recv = JSON.parse(message);
let echo = recv.echo ?? "";
if (actionMap.has(recv.action)) {
let action = actionMap.get(recv.action)
const result = await action.websocketHandle(recv.params, echo);
ws.send(JSON.stringify(result));
if (actionMap.has(recv.action)) {
let action = actionMap.get(recv.action)
const result = await action.websocketHandle(recv.params, echo);
ws.send(JSON.stringify(result));
}
else {
ws.send(JSON.stringify(OB11WebsocketResponse.error("Bad Request", 1400, echo)));
}
} catch (e) {
log(e.stack);
ws.send(JSON.stringify(OB11WebsocketResponse.error(e.stack.toString(), 1200)));
}
else {
ws.send(JSON.stringify(OB11WebsocketResponse.error("Bad Request", 1400, echo)));
}
} catch (e) {
log(e.stack);
ws.send(JSON.stringify(OB11WebsocketResponse.error(e.stack.toString(), 1200)));
}
});
ws.on("close", function (ev) {
unregisterEventSender(ws);
if (type == WebsocketType.EVENT || type == WebsocketType.ALL) {
unregisterEventSender(ws);
}
});
}