begin refactoring ob11/server

This commit is contained in:
Wesley F. Young 2024-08-10 13:48:27 +08:00
parent 32a932ad5c
commit c6e980ed96
5 changed files with 0 additions and 513 deletions

View File

@ -1,57 +0,0 @@
import { Response } from 'express';
import { OB11Response } from '../action/OB11Response';
import { HttpServerBase } from '@/common/server/http';
import { OB11HeartbeatEvent } from '@/onebot/event/meta/OB11HeartbeatEvent';
import { postOB11Event } from '@/onebot/server/postOB11Event';
class OB11HTTPServer extends HttpServerBase {
name = 'OneBot V11 server';
handleFailed(res: Response, payload: any, e: Error) {
res.send(OB11Response.error(e?.stack?.toString() || e.message || 'Error Handle', 200));
}
protected listen(port: number, host: string) {
if (ob11Config.http.enable) {
super.listen(port, host);
}
}
}
export const ob11HTTPServer = new OB11HTTPServer();
setTimeout(() => {
for (const [actionName, action] of actionMap) {
for (const method of ['post', 'get']) {
ob11HTTPServer.registerRouter(method, actionName, (res, payload) => {
return action.handle(payload);
});
}
}
}, 0);
class HTTPHeart {
intervalId: NodeJS.Timeout | null = null;
start(NewHeartInterval: number | undefined = undefined) {
let { heartInterval } = ob11Config;
if (NewHeartInterval && !Number.isNaN(NewHeartInterval)) {
heartInterval = NewHeartInterval;
}
if (this.intervalId) {
clearInterval(this.intervalId);
}
this.intervalId = setInterval(() => {
// ws的心跳是ws自己维护的
postOB11Event(new OB11HeartbeatEvent(!!selfInfo.online, true, heartInterval), false, false);
}, heartInterval);
}
stop() {
if (this.intervalId) {
clearInterval(this.intervalId);
}
}
}
export const httpHeart = new HTTPHeart();

View File

@ -1,200 +0,0 @@
import { OB11Message, OB11MessageAt, OB11MessageData, OB11MessageReply } from '../types';
import { OB11BaseMetaEvent } from '../event/meta/OB11BaseMetaEvent';
import { OB11BaseNoticeEvent } from '../event/notice/OB11BaseNoticeEvent';
import { WebSocket as WebSocketClass } from 'ws';
import { wsReply } from './ws/reply';
import crypto from 'crypto';
import { ChatType, Group, GroupRequestOperateTypes, Peer } from '@/core/entities';
import { normalize, sendMsg } from '../action/msg/SendMsg';
import { OB11FriendRequestEvent } from '../event/request/OB11FriendRequest';
import { OB11GroupRequestEvent } from '../event/request/OB11GroupRequest';
import { isNull } from '@/common/utils/helper';
import createSendElements from '../action/msg/SendMsg/create-send-elements';
import { NapCatCore } from '@/core';
export type QuickActionEvent = OB11Message | OB11BaseMetaEvent | OB11BaseNoticeEvent
export type PostEventType = OB11Message | OB11BaseMetaEvent | OB11BaseNoticeEvent
interface QuickActionPrivateMessage {
reply?: string;
auto_escape?: boolean;
}
interface QuickActionGroupMessage extends QuickActionPrivateMessage {
// 回复群消息
at_sender?: boolean;
delete?: boolean;
kick?: boolean;
ban?: boolean;
ban_duration?: number;
//
}
interface QuickActionFriendRequest {
approve?: boolean;
remark?: string;
}
interface QuickActionGroupRequest {
approve?: boolean;
reason?: string;
}
export type QuickAction =
QuickActionPrivateMessage
& QuickActionGroupMessage
& QuickActionFriendRequest
& QuickActionGroupRequest
const eventWSList: WebSocketClass[] = [];
export function registerWsEventSender(ws: WebSocketClass) {
eventWSList.push(ws);
}
export function unregisterWsEventSender(ws: WebSocketClass) {
const index = eventWSList.indexOf(ws);
if (index !== -1) {
eventWSList.splice(index, 1);
}
}
export function postWsEvent(event: QuickActionEvent) {
for (const ws of eventWSList) {
new Promise(() => {
wsReply(ws, event);
}).then();
}
}
export function postOB11Event(
msg: QuickActionEvent,
reportSelf = false,
postWs = true,
enablePost = true,
httpSecret: string | undefined = undefined,
coreContext: NapCatCore,
HttpPostUrl: string[]
) {
// 判断msg是否是event
if (!reportSelf) {
if (msg.post_type === 'message' && (msg as OB11Message).user_id.toString() == coreContext.selfInfo.uin) {
return;
}
}
if (enablePost) {
const msgStr = JSON.stringify(msg);
const hmac = crypto.createHmac('sha1', (httpSecret || "").toString());
hmac.update(msgStr);
const sig = hmac.digest('hex');
const headers: Record<string, string> = {
'Content-Type': 'application/json',
'x-self-id': coreContext.selfInfo.uin
};
if (httpSecret) {
headers['x-signature'] = 'sha1=' + sig;
}
for (const host of HttpPostUrl) {
fetch(host, {
method: 'POST',
headers,
body: msgStr
}).then(async (res) => {
//logDebug(`新消息事件HTTP上报成功: ${host} `, msgStr);
// todo: 处理不够优雅应该使用高级泛型进行QuickAction类型识别
let resJson: QuickAction;
try {
resJson = await res.json();
//logDebug('新消息事件HTTP上报返回快速操作: ', JSON.stringify(resJson));
} catch (e) {
coreContext.context.logger.logDebug('新消息事件HTTP上报没有返回快速操作不需要处理');
return;
}
try {
handleQuickOperation(msg as QuickActionEvent, resJson, coreContext).then().catch(coreContext.context.logger.logError);
} catch (e: any) {
coreContext.context.logger.logError('新消息事件HTTP上报返回快速操作失败', e);
}
}, (err: any) => {
coreContext.context.logger.logError(`新消息事件HTTP上报失败: ${host} `, err, msg);
});
}
}
if (postWs) {
postWsEvent(msg);
}
}
async function handleMsg(msg: OB11Message, quickAction: QuickAction, coreContext: NapCatCore) {
const NTQQUserApi = coreContext.getApiContext().UserApi;
msg = msg as OB11Message;
const reply = quickAction.reply;
const peer: Peer = {
chatType: ChatType.friend,
peerUid: await NTQQUserApi.getUidByUin(msg.user_id.toString()) as string
};
if (msg.message_type == 'private') {
if (msg.sub_type === 'group') {
peer.chatType = ChatType.temp;
}
} else {
peer.chatType = ChatType.group;
peer.peerUid = msg.group_id!.toString();
}
if (reply) {
let group: string | undefined;
let replyMessage: OB11MessageData[] = [];
if (msg.message_type == 'group') {
group = msg.group_id!.toString();
replyMessage.push({
type: 'reply',
data: {
id: msg.message_id.toString()
}
} as OB11MessageReply);
if ((quickAction as QuickActionGroupMessage).at_sender) {
replyMessage.push({
type: 'at',
data: {
qq: msg.user_id.toString()
}
} as OB11MessageAt);
}
}
replyMessage = replyMessage.concat(normalize(reply, quickAction.auto_escape));
const { sendElements, deleteAfterSentFiles } = await createSendElements(coreContext, replyMessage, peer);
sendMsg(coreContext, peer, sendElements, deleteAfterSentFiles, false).then().catch(coreContext.context.logger.logError);
}
}
async function handleGroupRequest(request: OB11GroupRequestEvent, quickAction: QuickActionGroupRequest, coreContext: NapCatCore) {
const NTQQGroupApi = coreContext.getApiContext().GroupApi;
if (!isNull(quickAction.approve)) {
NTQQGroupApi.handleGroupRequest(
request.flag,
quickAction.approve ? GroupRequestOperateTypes.approve : GroupRequestOperateTypes.reject,
quickAction.reason,
).then().catch(coreContext.context.logger.logError);
}
}
async function handleFriendRequest(request: OB11FriendRequestEvent, quickAction: QuickActionFriendRequest, coreContext: NapCatCore) {
const NTQQFriendApi = coreContext.getApiContext().FriendApi;
if (!isNull(quickAction.approve)) {
NTQQFriendApi.handleFriendRequest(request.flag, !!quickAction.approve).then().catch(coreContext.context.logger.logError);
}
}
export async function handleQuickOperation(context: QuickActionEvent, quickAction: QuickAction, coreContext: NapCatCore) {
if (context.post_type === 'message') {
handleMsg(context as OB11Message, quickAction, coreContext).then().catch(coreContext.context.logger.logError);
}
if (context.post_type === 'request') {
const friendRequest = context as OB11FriendRequestEvent;
const groupRequest = context as OB11GroupRequestEvent;
if ((friendRequest).request_type === 'friend') {
handleFriendRequest(friendRequest, quickAction, coreContext).then().catch(coreContext.context.logger.logError);
}
else if (groupRequest.request_type === 'group') {
handleGroupRequest(groupRequest, quickAction, coreContext).then().catch(coreContext.context.logger.logError);
}
}
}

View File

@ -1,147 +0,0 @@
import { LifeCycleSubType, OB11LifeCycleEvent } from '../../event/meta/OB11LifeCycleEvent';
import { ActionName } from '../../action/types';
import { OB11Response } from '../../action/OB11Response';
import BaseAction from '../../action/BaseAction';
import { actionMap } from '../../action';
import { postWsEvent, registerWsEventSender, unregisterWsEventSender } from '../postOB11Event';
import { wsReply } from './reply';
import { WebSocket as WebSocketClass } from 'ws';
import { OB11HeartbeatEvent } from '../../event/meta/OB11HeartbeatEvent';
import { log, logDebug, logError } from '../../../common/utils/log';
import { ob11Config } from '@/onebot11/config';
import { napCatCore } from '@/core';
import { selfInfo } from '@/core/data';
export let rwsList: ReverseWebsocket[] = [];
export class ReverseWebsocket {
public websocket: WebSocketClass | undefined;
public url: string;
private running: boolean = false;
public constructor(url: string) {
this.url = url;
this.running = true;
this.connect();
}
public stop() {
this.running = false;
this.websocket!.close();
}
public onopen() {
wsReply(this.websocket!, new OB11LifeCycleEvent(LifeCycleSubType.CONNECT));
}
public async onmessage(msg: string) {
let receiveData: { action: ActionName | undefined, params: any, echo?: any } = { action: undefined, params: {} };
let echo = null;
try {
receiveData = JSON.parse(msg.toString());
echo = receiveData.echo;
//logDebug('收到反向Websocket消息', receiveData);
} catch (e) {
return wsReply(this.websocket!, OB11Response.error('json解析失败请检查数据格式', 1400, echo));
}
const action: BaseAction<any, any> | undefined = actionMap.get(receiveData.action!);
if (!action) {
return wsReply(this.websocket!, OB11Response.error('不支持的api ' + receiveData.action, 1404, echo));
}
try {
receiveData.params = (receiveData?.params) ? receiveData.params : {};//兼容类型验证
const handleResult = await action.websocketHandle(receiveData.params, echo);
wsReply(this.websocket!, handleResult);
} catch (e) {
wsReply(this.websocket!, OB11Response.error(`api处理出错:${e}`, 1200, echo));
}
}
public onclose = () => {
logError('反向ws断开', this.url);
unregisterWsEventSender(this.websocket!);
if (this.running) {
this.reconnect();
}
};
public send(msg: string) {
if (this.websocket && this.websocket.readyState == WebSocket.OPEN) {
this.websocket.send(msg);
}
}
private reconnect() {
setTimeout(() => {
this.connect();
}, 3000); // TODO: 重连间隔在配置文件中实现
}
private connect() {
const { token, heartInterval } = ob11Config;
this.websocket = new WebSocketClass(this.url, {
maxPayload: 1024 * 1024 * 1024,
handshakeTimeout: 2000,
perMessageDeflate: false,
headers: {
'X-Self-ID': selfInfo.uin,
'Authorization': `Bearer ${token}`,
'x-client-role': 'Universal', // koishi-adapter-onebot 需要这个字段
'User-Agent': 'OneBot/11',
}
});
registerWsEventSender(this.websocket);
logDebug('Trying to connect to the websocket server: ' + this.url);
this.websocket.on('open', () => {
logDebug('Connected to the websocket server: ' + this.url);
this.onopen();
});
this.websocket.on('message', async (data) => {
await this.onmessage(data.toString());
});
this.websocket.on('error', log);
const wsClientInterval = setInterval(() => {
wsReply(this.websocket!, new OB11HeartbeatEvent(!!selfInfo.online, true, heartInterval));
}, heartInterval); // 心跳包
this.websocket.on('close', () => {
clearInterval(wsClientInterval);
logDebug('The websocket connection: ' + this.url + ' closed, trying reconnecting...');
this.onclose();
});
}
}
class OB11ReverseWebsockets {
start() {
for (const url of ob11Config.reverseWs.urls) {
log('开始连接反向ws', url);
new Promise(() => {
try {
rwsList.push(new ReverseWebsocket(url));
} catch (e: any) {
logError(e.stack);
}
}).then();
}
}
stop() {
for (const rws of rwsList) {
rws.stop();
}
rwsList = [];//清空旧的反向ws
}
restart() {
this.stop();
this.start();
}
}
export const ob11ReverseWebsockets = new OB11ReverseWebsockets();

View File

@ -1,86 +0,0 @@
import { WebSocket } from 'ws';
import http from 'http';
import { actionMap } from '../../action';
import { OB11Response } from '../../action/OB11Response';
import { postWsEvent, registerWsEventSender, unregisterWsEventSender } from '../postOB11Event';
import { ActionName } from '../../action/types';
import BaseAction from '../../action/BaseAction';
import { LifeCycleSubType, OB11LifeCycleEvent } from '../../event/meta/OB11LifeCycleEvent';
import { OB11HeartbeatEvent } from '../../event/meta/OB11HeartbeatEvent';
import { WebsocketServerBase } from '@/common/server/websocket';
import { IncomingMessage } from 'node:http';
import { wsReply } from './reply';
import { napCatCore } from '@/core';
import { log, logDebug, logError } from '../../../common/utils/log';
import { ob11Config } from '@/onebot11/config';
import { selfInfo } from '@/core/data';
const heartbeatRunning = false;
class OB11WebsocketServer extends WebsocketServerBase {
public start(port: number | http.Server, host: string = '') {
this.token = ob11Config.token;
super.start(port, host);
}
authorizeFailed(wsClient: WebSocket) {
wsClient.send(JSON.stringify(OB11Response.res(null, 'failed', 1403, 'token验证失败')));
}
async handleAction(wsClient: WebSocket, actionName: string, params: any, echo?: any) {
const action: BaseAction<any, any> | undefined = actionMap.get(actionName);
if (!action) {
return wsReply(wsClient, OB11Response.error('不支持的api ' + actionName, 1404, echo));
}
try {
const handleResult = await action.websocketHandle(params, echo);
wsReply(wsClient, handleResult);
} catch (e: any) {
wsReply(wsClient, OB11Response.error(`api处理出错:${e.stack}`, 1200, echo));
}
}
onConnect(wsClient: WebSocket, url: string, req: IncomingMessage) {
if (url == '/api' || url == '/api/' || url == '/') {
wsClient.on('message', async (msg) => {
// eslint-disable-next-line @typescript-eslint/ban-ts-comment
// @ts-expect-error
let receiveData: { action: ActionName, params?: any, echo?: any } = { action: '', params: {} };
let echo = null;
try {
receiveData = JSON.parse(msg.toString());
echo = receiveData.echo;
logDebug('收到正向Websocket消息', receiveData);
} catch (e) {
return wsReply(wsClient, OB11Response.error('json解析失败请检查数据格式', 1400, echo));
}
receiveData.params = (receiveData?.params) ? receiveData.params : {};//兼容类型验证
this.handleAction(wsClient, receiveData.action, receiveData.params, receiveData.echo).then();
});
}
if (url == '/event' || url == '/event/' || url == '/') {
registerWsEventSender(wsClient);
logDebug('event上报ws客户端已连接');
try {
wsReply(wsClient, new OB11LifeCycleEvent(LifeCycleSubType.CONNECT));
} catch (e) {
logError('发送生命周期失败', e);
}
const { heartInterval } = ob11Config;
const wsClientInterval = setInterval(() => {
wsReply(wsClient, new OB11HeartbeatEvent(!!selfInfo.online, true, heartInterval));
}, heartInterval); // 心跳包
wsClient.on('close', () => {
logError('event上报ws客户端已断开');
clearInterval(wsClientInterval);
unregisterWsEventSender(wsClient);
});
}
}
}
export const ob11WebsocketServer = new OB11WebsocketServer();

View File

@ -1,23 +0,0 @@
import { WebSocket as WebSocketClass } from 'ws';
import { OB11Response } from '../../action/OB11Response';
import { PostEventType } from '../postOB11Event';
import { log, logDebug, logError } from '@/common/utils/log';
import { isNull } from '@/common/utils/helper';
export function wsReply(wsClient: WebSocketClass, data: OB11Response | PostEventType) {
try {
const packet = Object.assign({}, data);
// eslint-disable-next-line @typescript-eslint/ban-ts-comment
// @ts-expect-error
if (isNull(packet['echo'])) {
// eslint-disable-next-line @typescript-eslint/ban-ts-comment
// @ts-expect-error
delete packet['echo'];
}
wsClient.send(JSON.stringify(packet));
logDebug('ws 消息上报', wsClient.url || '', data);
} catch (e: any) {
logError('websocket 回复失败', e.stack, data);
}
}