mirror of
https://github.com/NapNeko/NapCatQQ.git
synced 2024-11-21 09:36:35 +00:00
chore: server
This commit is contained in:
parent
cb03501eff
commit
cdc10d6c4b
57
src/onebot/server/http.ts
Normal file
57
src/onebot/server/http.ts
Normal file
@ -0,0 +1,57 @@
|
||||
import { Response } from 'express';
|
||||
import { OB11Response } from '../action/OB11Response';
|
||||
import { HttpServerBase } from '@/common/server/http';
|
||||
import { OB11HeartbeatEvent } from '@/onebot/event/meta/OB11HeartbeatEvent';
|
||||
import { postOB11Event } from '@/onebot/server/postOB11Event';
|
||||
|
||||
class OB11HTTPServer extends HttpServerBase {
|
||||
name = 'OneBot V11 server';
|
||||
|
||||
handleFailed(res: Response, payload: any, e: Error) {
|
||||
res.send(OB11Response.error(e?.stack?.toString() || e.message || 'Error Handle', 200));
|
||||
}
|
||||
|
||||
protected listen(port: number, host: string) {
|
||||
if (ob11Config.http.enable) {
|
||||
super.listen(port, host);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
export const ob11HTTPServer = new OB11HTTPServer();
|
||||
|
||||
setTimeout(() => {
|
||||
for (const [actionName, action] of actionMap) {
|
||||
for (const method of ['post', 'get']) {
|
||||
ob11HTTPServer.registerRouter(method, actionName, (res, payload) => {
|
||||
return action.handle(payload);
|
||||
});
|
||||
}
|
||||
}
|
||||
}, 0);
|
||||
|
||||
|
||||
class HTTPHeart {
|
||||
intervalId: NodeJS.Timeout | null = null;
|
||||
start(NewHeartInterval: number | undefined = undefined) {
|
||||
let { heartInterval } = ob11Config;
|
||||
if (NewHeartInterval && !Number.isNaN(NewHeartInterval)) {
|
||||
heartInterval = NewHeartInterval;
|
||||
}
|
||||
if (this.intervalId) {
|
||||
clearInterval(this.intervalId);
|
||||
}
|
||||
this.intervalId = setInterval(() => {
|
||||
// ws的心跳是ws自己维护的
|
||||
postOB11Event(new OB11HeartbeatEvent(!!selfInfo.online, true, heartInterval), false, false);
|
||||
}, heartInterval);
|
||||
}
|
||||
|
||||
stop() {
|
||||
if (this.intervalId) {
|
||||
clearInterval(this.intervalId);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
export const httpHeart = new HTTPHeart();
|
195
src/onebot/server/postOB11Event.ts
Normal file
195
src/onebot/server/postOB11Event.ts
Normal file
@ -0,0 +1,195 @@
|
||||
import { OB11Message, OB11MessageAt, OB11MessageData, OB11MessageReply } from '../types';
|
||||
import { OB11BaseMetaEvent } from '../event/meta/OB11BaseMetaEvent';
|
||||
import { OB11BaseNoticeEvent } from '../event/notice/OB11BaseNoticeEvent';
|
||||
import { WebSocket as WebSocketClass } from 'ws';
|
||||
import { wsReply } from './ws/reply';
|
||||
import crypto from 'crypto';
|
||||
import { ChatType, Group, GroupRequestOperateTypes, Peer } from '@/core/entities';
|
||||
import { normalize, sendMsg } from '../action/msg/SendMsg';
|
||||
import { OB11FriendRequestEvent } from '../event/request/OB11FriendRequest';
|
||||
import { OB11GroupRequestEvent } from '../event/request/OB11GroupRequest';
|
||||
import { isNull } from '@/common/utils/helper';
|
||||
import { NTQQFriendApi, NTQQGroupApi, NTQQUserApi } from '@/core/apis';
|
||||
import createSendElements from '../action/msg/SendMsg/create-send-elements';
|
||||
import { NapCatCore } from '@/core';
|
||||
|
||||
export type QuickActionEvent = OB11Message | OB11BaseMetaEvent | OB11BaseNoticeEvent
|
||||
export type PostEventType = OB11Message | OB11BaseMetaEvent | OB11BaseNoticeEvent
|
||||
|
||||
interface QuickActionPrivateMessage {
|
||||
reply?: string;
|
||||
auto_escape?: boolean;
|
||||
}
|
||||
|
||||
interface QuickActionGroupMessage extends QuickActionPrivateMessage {
|
||||
// 回复群消息
|
||||
at_sender?: boolean;
|
||||
delete?: boolean;
|
||||
kick?: boolean;
|
||||
ban?: boolean;
|
||||
ban_duration?: number;
|
||||
//
|
||||
}
|
||||
|
||||
interface QuickActionFriendRequest {
|
||||
approve?: boolean;
|
||||
remark?: string;
|
||||
}
|
||||
|
||||
interface QuickActionGroupRequest {
|
||||
approve?: boolean;
|
||||
reason?: string;
|
||||
}
|
||||
|
||||
export type QuickAction =
|
||||
QuickActionPrivateMessage
|
||||
& QuickActionGroupMessage
|
||||
& QuickActionFriendRequest
|
||||
& QuickActionGroupRequest
|
||||
|
||||
const eventWSList: WebSocketClass[] = [];
|
||||
|
||||
export function registerWsEventSender(ws: WebSocketClass) {
|
||||
eventWSList.push(ws);
|
||||
}
|
||||
|
||||
export function unregisterWsEventSender(ws: WebSocketClass) {
|
||||
const index = eventWSList.indexOf(ws);
|
||||
if (index !== -1) {
|
||||
eventWSList.splice(index, 1);
|
||||
}
|
||||
}
|
||||
|
||||
export function postWsEvent(event: QuickActionEvent) {
|
||||
for (const ws of eventWSList) {
|
||||
new Promise(() => {
|
||||
wsReply(ws, event);
|
||||
}).then();
|
||||
}
|
||||
}
|
||||
|
||||
export function postOB11Event(msg: QuickActionEvent, reportSelf = false, postWs = true) {
|
||||
const config = ob11Config;
|
||||
|
||||
// 判断msg是否是event
|
||||
if (!config.reportSelfMessage && !reportSelf) {
|
||||
if (msg.post_type === 'message' && (msg as OB11Message).user_id.toString() == selfInfo.uin) {
|
||||
return;
|
||||
}
|
||||
}
|
||||
if (config.http.enablePost) {
|
||||
const msgStr = JSON.stringify(msg);
|
||||
const hmac = crypto.createHmac('sha1', ob11Config.http.secret);
|
||||
hmac.update(msgStr);
|
||||
const sig = hmac.digest('hex');
|
||||
const headers: Record<string, string> = {
|
||||
'Content-Type': 'application/json',
|
||||
'x-self-id': selfInfo.uin
|
||||
};
|
||||
if (config.http.secret) {
|
||||
headers['x-signature'] = 'sha1=' + sig;
|
||||
}
|
||||
for (const host of config.http.postUrls) {
|
||||
fetch(host, {
|
||||
method: 'POST',
|
||||
headers,
|
||||
body: msgStr
|
||||
}).then(async (res) => {
|
||||
//logDebug(`新消息事件HTTP上报成功: ${host} `, msgStr);
|
||||
// todo: 处理不够优雅,应该使用高级泛型进行QuickAction类型识别
|
||||
let resJson: QuickAction;
|
||||
try {
|
||||
resJson = await res.json();
|
||||
//logDebug('新消息事件HTTP上报返回快速操作: ', JSON.stringify(resJson));
|
||||
} catch (e) {
|
||||
logDebug('新消息事件HTTP上报没有返回快速操作,不需要处理');
|
||||
return;
|
||||
}
|
||||
try {
|
||||
handleQuickOperation(msg as QuickActionEvent, resJson).then().catch(logError);
|
||||
} catch (e: any) {
|
||||
logError('新消息事件HTTP上报返回快速操作失败', e);
|
||||
}
|
||||
|
||||
}, (err: any) => {
|
||||
logError(`新消息事件HTTP上报失败: ${host} `, err, msg);
|
||||
});
|
||||
}
|
||||
}
|
||||
if (postWs) {
|
||||
postWsEvent(msg);
|
||||
}
|
||||
}
|
||||
async function handleMsg(msg: OB11Message, quickAction: QuickAction, coreContext: NapCatCore) {
|
||||
const NTQQUserApi = coreContext.getApiContext().UserApi;
|
||||
msg = msg as OB11Message;
|
||||
const reply = quickAction.reply;
|
||||
const peer: Peer = {
|
||||
chatType: ChatType.friend,
|
||||
peerUid: await NTQQUserApi.getUidByUin(msg.user_id.toString()) as string
|
||||
};
|
||||
if (msg.message_type == 'private') {
|
||||
if (msg.sub_type === 'group') {
|
||||
peer.chatType = ChatType.temp;
|
||||
}
|
||||
} else {
|
||||
peer.chatType = ChatType.group;
|
||||
peer.peerUid = msg.group_id!.toString();
|
||||
}
|
||||
if (reply) {
|
||||
let group: Group | undefined;
|
||||
let replyMessage: OB11MessageData[] = [];
|
||||
|
||||
if (msg.message_type == 'group') {
|
||||
group = await getGroup(msg.group_id!.toString());
|
||||
replyMessage.push({
|
||||
type: 'reply',
|
||||
data: {
|
||||
id: msg.message_id.toString()
|
||||
}
|
||||
} as OB11MessageReply);
|
||||
if ((quickAction as QuickActionGroupMessage).at_sender) {
|
||||
replyMessage.push({
|
||||
type: 'at',
|
||||
data: {
|
||||
qq: msg.user_id.toString()
|
||||
}
|
||||
} as OB11MessageAt);
|
||||
}
|
||||
}
|
||||
replyMessage = replyMessage.concat(normalize(reply, quickAction.auto_escape));
|
||||
const { sendElements, deleteAfterSentFiles } = await createSendElements(replyMessage, peer);
|
||||
sendMsg(peer, sendElements, deleteAfterSentFiles, false).then().catch(logError);
|
||||
}
|
||||
}
|
||||
async function handleGroupRequest(request: OB11GroupRequestEvent, quickAction: QuickActionGroupRequest, coreContext: NapCatCore) {
|
||||
const NTQQGroupApi = coreContext.getApiContext().GroupApi;
|
||||
if (!isNull(quickAction.approve)) {
|
||||
NTQQGroupApi.handleGroupRequest(
|
||||
request.flag,
|
||||
quickAction.approve ? GroupRequestOperateTypes.approve : GroupRequestOperateTypes.reject,
|
||||
quickAction.reason,
|
||||
).then().catch(coreContext.context.logger.logError);
|
||||
}
|
||||
}
|
||||
async function handleFriendRequest(request: OB11FriendRequestEvent, quickAction: QuickActionFriendRequest, coreContext: NapCatCore) {
|
||||
const NTQQFriendApi = coreContext.getApiContext().FriendApi;
|
||||
if (!isNull(quickAction.approve)) {
|
||||
NTQQFriendApi.handleFriendRequest(request.flag, !!quickAction.approve).then().catch(coreContext.context.logger.logError);
|
||||
}
|
||||
}
|
||||
export async function handleQuickOperation(context: QuickActionEvent, quickAction: QuickAction, coreContext: NapCatCore) {
|
||||
if (context.post_type === 'message') {
|
||||
handleMsg(context as OB11Message, quickAction, coreContext).then().catch(coreContext.context.logger.logError);
|
||||
}
|
||||
if (context.post_type === 'request') {
|
||||
const friendRequest = context as OB11FriendRequestEvent;
|
||||
const groupRequest = context as OB11GroupRequestEvent;
|
||||
if ((friendRequest).request_type === 'friend') {
|
||||
handleFriendRequest(friendRequest, quickAction, coreContext).then().catch(coreContext.context.logger.logError);
|
||||
}
|
||||
else if (groupRequest.request_type === 'group') {
|
||||
handleGroupRequest(groupRequest, quickAction, coreContext).then().catch(coreContext.context.logger.logError);
|
||||
}
|
||||
}
|
||||
}
|
147
src/onebot/server/ws/ReverseWebsocket.ts
Normal file
147
src/onebot/server/ws/ReverseWebsocket.ts
Normal file
@ -0,0 +1,147 @@
|
||||
import { LifeCycleSubType, OB11LifeCycleEvent } from '../../event/meta/OB11LifeCycleEvent';
|
||||
import { ActionName } from '../../action/types';
|
||||
import { OB11Response } from '../../action/OB11Response';
|
||||
import BaseAction from '../../action/BaseAction';
|
||||
import { actionMap } from '../../action';
|
||||
import { postWsEvent, registerWsEventSender, unregisterWsEventSender } from '../postOB11Event';
|
||||
import { wsReply } from './reply';
|
||||
import { WebSocket as WebSocketClass } from 'ws';
|
||||
import { OB11HeartbeatEvent } from '../../event/meta/OB11HeartbeatEvent';
|
||||
import { log, logDebug, logError } from '../../../common/utils/log';
|
||||
import { ob11Config } from '@/onebot11/config';
|
||||
import { napCatCore } from '@/core';
|
||||
import { selfInfo } from '@/core/data';
|
||||
|
||||
export let rwsList: ReverseWebsocket[] = [];
|
||||
|
||||
export class ReverseWebsocket {
|
||||
public websocket: WebSocketClass | undefined;
|
||||
public url: string;
|
||||
private running: boolean = false;
|
||||
|
||||
public constructor(url: string) {
|
||||
this.url = url;
|
||||
this.running = true;
|
||||
this.connect();
|
||||
}
|
||||
|
||||
public stop() {
|
||||
this.running = false;
|
||||
this.websocket!.close();
|
||||
}
|
||||
|
||||
public onopen() {
|
||||
wsReply(this.websocket!, new OB11LifeCycleEvent(LifeCycleSubType.CONNECT));
|
||||
}
|
||||
|
||||
public async onmessage(msg: string) {
|
||||
let receiveData: { action: ActionName | undefined, params: any, echo?: any } = { action: undefined, params: {} };
|
||||
let echo = null;
|
||||
try {
|
||||
receiveData = JSON.parse(msg.toString());
|
||||
echo = receiveData.echo;
|
||||
//logDebug('收到反向Websocket消息', receiveData);
|
||||
} catch (e) {
|
||||
return wsReply(this.websocket!, OB11Response.error('json解析失败,请检查数据格式', 1400, echo));
|
||||
}
|
||||
const action: BaseAction<any, any> | undefined = actionMap.get(receiveData.action!);
|
||||
if (!action) {
|
||||
return wsReply(this.websocket!, OB11Response.error('不支持的api ' + receiveData.action, 1404, echo));
|
||||
}
|
||||
try {
|
||||
receiveData.params = (receiveData?.params) ? receiveData.params : {};//兼容类型验证
|
||||
const handleResult = await action.websocketHandle(receiveData.params, echo);
|
||||
wsReply(this.websocket!, handleResult);
|
||||
} catch (e) {
|
||||
wsReply(this.websocket!, OB11Response.error(`api处理出错:${e}`, 1200, echo));
|
||||
}
|
||||
}
|
||||
|
||||
public onclose = () => {
|
||||
logError('反向ws断开', this.url);
|
||||
unregisterWsEventSender(this.websocket!);
|
||||
if (this.running) {
|
||||
this.reconnect();
|
||||
}
|
||||
};
|
||||
|
||||
public send(msg: string) {
|
||||
if (this.websocket && this.websocket.readyState == WebSocket.OPEN) {
|
||||
this.websocket.send(msg);
|
||||
}
|
||||
}
|
||||
|
||||
private reconnect() {
|
||||
setTimeout(() => {
|
||||
this.connect();
|
||||
}, 3000); // TODO: 重连间隔在配置文件中实现
|
||||
}
|
||||
|
||||
private connect() {
|
||||
const { token, heartInterval } = ob11Config;
|
||||
this.websocket = new WebSocketClass(this.url, {
|
||||
maxPayload: 1024 * 1024 * 1024,
|
||||
handshakeTimeout: 2000,
|
||||
perMessageDeflate: false,
|
||||
headers: {
|
||||
'X-Self-ID': selfInfo.uin,
|
||||
'Authorization': `Bearer ${token}`,
|
||||
'x-client-role': 'Universal', // koishi-adapter-onebot 需要这个字段
|
||||
'User-Agent': 'OneBot/11',
|
||||
}
|
||||
});
|
||||
registerWsEventSender(this.websocket);
|
||||
logDebug('Trying to connect to the websocket server: ' + this.url);
|
||||
|
||||
|
||||
this.websocket.on('open', () => {
|
||||
logDebug('Connected to the websocket server: ' + this.url);
|
||||
this.onopen();
|
||||
});
|
||||
|
||||
this.websocket.on('message', async (data) => {
|
||||
await this.onmessage(data.toString());
|
||||
});
|
||||
|
||||
this.websocket.on('error', log);
|
||||
|
||||
const wsClientInterval = setInterval(() => {
|
||||
wsReply(this.websocket!, new OB11HeartbeatEvent(!!selfInfo.online, true, heartInterval));
|
||||
}, heartInterval); // 心跳包
|
||||
this.websocket.on('close', () => {
|
||||
clearInterval(wsClientInterval);
|
||||
logDebug('The websocket connection: ' + this.url + ' closed, trying reconnecting...');
|
||||
this.onclose();
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
class OB11ReverseWebsockets {
|
||||
start() {
|
||||
for (const url of ob11Config.reverseWs.urls) {
|
||||
log('开始连接反向ws', url);
|
||||
new Promise(() => {
|
||||
try {
|
||||
rwsList.push(new ReverseWebsocket(url));
|
||||
} catch (e: any) {
|
||||
logError(e.stack);
|
||||
}
|
||||
}).then();
|
||||
}
|
||||
}
|
||||
|
||||
stop() {
|
||||
for (const rws of rwsList) {
|
||||
rws.stop();
|
||||
}
|
||||
rwsList = [];//清空旧的反向ws
|
||||
}
|
||||
|
||||
restart() {
|
||||
this.stop();
|
||||
this.start();
|
||||
}
|
||||
}
|
||||
|
||||
export const ob11ReverseWebsockets = new OB11ReverseWebsockets();
|
||||
|
86
src/onebot/server/ws/WebsocketServer.ts
Normal file
86
src/onebot/server/ws/WebsocketServer.ts
Normal file
@ -0,0 +1,86 @@
|
||||
import { WebSocket } from 'ws';
|
||||
import http from 'http';
|
||||
import { actionMap } from '../../action';
|
||||
import { OB11Response } from '../../action/OB11Response';
|
||||
import { postWsEvent, registerWsEventSender, unregisterWsEventSender } from '../postOB11Event';
|
||||
import { ActionName } from '../../action/types';
|
||||
import BaseAction from '../../action/BaseAction';
|
||||
import { LifeCycleSubType, OB11LifeCycleEvent } from '../../event/meta/OB11LifeCycleEvent';
|
||||
import { OB11HeartbeatEvent } from '../../event/meta/OB11HeartbeatEvent';
|
||||
import { WebsocketServerBase } from '@/common/server/websocket';
|
||||
import { IncomingMessage } from 'node:http';
|
||||
import { wsReply } from './reply';
|
||||
import { napCatCore } from '@/core';
|
||||
import { log, logDebug, logError } from '../../../common/utils/log';
|
||||
import { ob11Config } from '@/onebot11/config';
|
||||
import { selfInfo } from '@/core/data';
|
||||
|
||||
const heartbeatRunning = false;
|
||||
|
||||
class OB11WebsocketServer extends WebsocketServerBase {
|
||||
|
||||
public start(port: number | http.Server, host: string = '') {
|
||||
this.token = ob11Config.token;
|
||||
super.start(port, host);
|
||||
}
|
||||
|
||||
authorizeFailed(wsClient: WebSocket) {
|
||||
wsClient.send(JSON.stringify(OB11Response.res(null, 'failed', 1403, 'token验证失败')));
|
||||
}
|
||||
|
||||
async handleAction(wsClient: WebSocket, actionName: string, params: any, echo?: any) {
|
||||
const action: BaseAction<any, any> | undefined = actionMap.get(actionName);
|
||||
if (!action) {
|
||||
return wsReply(wsClient, OB11Response.error('不支持的api ' + actionName, 1404, echo));
|
||||
}
|
||||
try {
|
||||
const handleResult = await action.websocketHandle(params, echo);
|
||||
wsReply(wsClient, handleResult);
|
||||
} catch (e: any) {
|
||||
wsReply(wsClient, OB11Response.error(`api处理出错:${e.stack}`, 1200, echo));
|
||||
}
|
||||
}
|
||||
|
||||
onConnect(wsClient: WebSocket, url: string, req: IncomingMessage) {
|
||||
if (url == '/api' || url == '/api/' || url == '/') {
|
||||
wsClient.on('message', async (msg) => {
|
||||
// eslint-disable-next-line @typescript-eslint/ban-ts-comment
|
||||
// @ts-expect-error
|
||||
let receiveData: { action: ActionName, params?: any, echo?: any } = { action: '', params: {} };
|
||||
let echo = null;
|
||||
try {
|
||||
receiveData = JSON.parse(msg.toString());
|
||||
echo = receiveData.echo;
|
||||
logDebug('收到正向Websocket消息', receiveData);
|
||||
} catch (e) {
|
||||
return wsReply(wsClient, OB11Response.error('json解析失败,请检查数据格式', 1400, echo));
|
||||
}
|
||||
receiveData.params = (receiveData?.params) ? receiveData.params : {};//兼容类型验证
|
||||
this.handleAction(wsClient, receiveData.action, receiveData.params, receiveData.echo).then();
|
||||
});
|
||||
}
|
||||
if (url == '/event' || url == '/event/' || url == '/') {
|
||||
registerWsEventSender(wsClient);
|
||||
|
||||
logDebug('event上报ws客户端已连接');
|
||||
|
||||
try {
|
||||
wsReply(wsClient, new OB11LifeCycleEvent(LifeCycleSubType.CONNECT));
|
||||
} catch (e) {
|
||||
logError('发送生命周期失败', e);
|
||||
}
|
||||
const { heartInterval } = ob11Config;
|
||||
const wsClientInterval = setInterval(() => {
|
||||
wsReply(wsClient, new OB11HeartbeatEvent(!!selfInfo.online, true, heartInterval));
|
||||
}, heartInterval); // 心跳包
|
||||
wsClient.on('close', () => {
|
||||
logError('event上报ws客户端已断开');
|
||||
clearInterval(wsClientInterval);
|
||||
unregisterWsEventSender(wsClient);
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
export const ob11WebsocketServer = new OB11WebsocketServer();
|
||||
|
23
src/onebot/server/ws/reply.ts
Normal file
23
src/onebot/server/ws/reply.ts
Normal file
@ -0,0 +1,23 @@
|
||||
import { WebSocket as WebSocketClass } from 'ws';
|
||||
import { OB11Response } from '../../action/OB11Response';
|
||||
import { PostEventType } from '../postOB11Event';
|
||||
import { log, logDebug, logError } from '@/common/utils/log';
|
||||
import { isNull } from '@/common/utils/helper';
|
||||
|
||||
|
||||
export function wsReply(wsClient: WebSocketClass, data: OB11Response | PostEventType) {
|
||||
try {
|
||||
const packet = Object.assign({}, data);
|
||||
// eslint-disable-next-line @typescript-eslint/ban-ts-comment
|
||||
// @ts-expect-error
|
||||
if (isNull(packet['echo'])) {
|
||||
// eslint-disable-next-line @typescript-eslint/ban-ts-comment
|
||||
// @ts-expect-error
|
||||
delete packet['echo'];
|
||||
}
|
||||
wsClient.send(JSON.stringify(packet));
|
||||
logDebug('ws 消息上报', wsClient.url || '', data);
|
||||
} catch (e: any) {
|
||||
logError('websocket 回复失败', e.stack, data);
|
||||
}
|
||||
}
|
Loading…
x
Reference in New Issue
Block a user