From a849b5edc0486d549274a312ffeb4f3e2f85d5c2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=89=8B=E7=93=9C=E4=B8=80=E5=8D=81=E9=9B=AA?= Date: Fri, 21 Mar 2025 16:07:47 +0800 Subject: [PATCH] feat: websocket server --- src/onebot/network/websocket-server.ts | 178 ++++++++++++++++++++++--- 1 file changed, 160 insertions(+), 18 deletions(-) diff --git a/src/onebot/network/websocket-server.ts b/src/onebot/network/websocket-server.ts index c679835b..a5c0da22 100644 --- a/src/onebot/network/websocket-server.ts +++ b/src/onebot/network/websocket-server.ts @@ -5,20 +5,44 @@ import { WebsocketServerConfig } from '@/onebot/config/config'; import { NapCatOneBot11Adapter } from '@/onebot'; import { IOB11NetworkAdapter } from '@/onebot/network/adapter'; import { serve } from '@hono/node-server'; -import { Hono } from 'hono'; +import { Context, Hono } from 'hono'; import { createNodeWebSocket } from '@hono/node-ws'; import { WSContext, WSMessageReceive } from 'hono/ws'; +import { OB11Response } from '../action/OneBotAction'; +import { ActionName } from '../action/router'; +import { OB11HeartbeatEvent } from '@/onebot/event/meta/OB11HeartbeatEvent'; +import { LifeCycleSubType, OB11LifeCycleEvent } from '@/onebot/event/meta/OB11LifeCycleEvent'; + export class OB11WebsocketServerAdapter extends IOB11NetworkAdapter { private app: Hono | undefined; private server: ReturnType | undefined; + private clients: Set> = new Set(); + private eventClients: Set> = new Set(); // 仅用于接收事件的客户端 + private heartbeatIntervalId: NodeJS.Timeout | null = null; constructor(name: string, config: WebsocketServerConfig, core: NapCatCore, obContext: NapCatOneBot11Adapter, actions: ActionMap) { super(name, config, core, obContext, actions); } - // eslint-disable-next-line @typescript-eslint/no-unused-vars - override onEvent(_event: T) { - // Websocket server is passive, no need to emit event + override onEvent(event: T) { + if (!this.isEnable || this.eventClients.size === 0) return; + + try { + const eventData = JSON.stringify(event); + this.eventClients.forEach(client => { + try { + client.send(eventData); + } catch (e) { + this.core.context.logger.logError(`[OneBot] [Websocket Server Adapter] 向客户端发送事件失败: ${e}`); + } + }); + + if (this.config.debug) { + this.core.context.logger.logDebug(`[OneBot] [Websocket Server Adapter] 已广播事件到 ${this.eventClients.size} 个客户端`); + } + } catch (e) { + this.core.context.logger.logError(`[OneBot] [Websocket Server Adapter] 事件序列化失败: ${e}`); + } } open() { @@ -29,6 +53,11 @@ export class OB11WebsocketServerAdapter extends IOB11NetworkAdapter 0) { + this.registerHeartBeat(); + } } catch (e) { this.core.context.logger.logError(`[OneBot] [Websocket Server Adapter] 启动错误: ${e}`); } @@ -36,46 +65,146 @@ export class OB11WebsocketServerAdapter extends IOB11NetworkAdapter { + if (!this.isEnable || this.eventClients.size === 0) return; + + try { + const heartbeatEvent = new OB11HeartbeatEvent( + this.core, + this.config.heartInterval, + this.core.selfInfo.online ?? true, + true + ); + + const eventData = JSON.stringify(heartbeatEvent); + this.eventClients.forEach(client => { + try { + client.send(eventData); + } catch (e) { + this.core.context.logger.logError(`[OneBot] [Websocket Server Adapter] 发送心跳失败: ${e}`); + } + }); + } catch (e) { + this.core.context.logger.logError(`[OneBot] [Websocket Server Adapter] 心跳事件生成失败: ${e}`); + } + }, this.config.heartInterval); + } + private initializeServer() { this.app = new Hono(); - const { injectWebSocket, upgradeWebSocket } = createNodeWebSocket({ app: this.app }) - this.app.all('/*', upgradeWebSocket(c => { - return { - onMessage: (evt, ws) => { - this.actionHandler(evt, ws) - }, + const { injectWebSocket, upgradeWebSocket } = createNodeWebSocket({ app: this.app }); + + // 处理所有WebSocket请求 + this.app.all('/*', upgradeWebSocket((c) => { + // 鉴权处理 + if (this.config.token && this.config.token.length > 0) { + const url = new URL(c.req.url, `http://${c.req.header('host') || 'localhost'}`); + const queryToken = url.searchParams.get('access_token'); + const authHeader = c.req.header('authorization'); + const headerToken = authHeader?.startsWith('Bearer ') ? authHeader.substring(7) : ''; + const clientToken = queryToken || headerToken; + + if (clientToken !== this.config.token) { + return { + onOpen: (_evt, ws) => { + ws.send(JSON.stringify(OB11Response.res(null, 'failed', 1403, 'token验证失败'))); + ws.close(); + } + }; + } } + + // 判断连接类型 + const url = new URL(c.req.url, `http://${c.req.header('host') || 'localhost'}`); + const path = url.pathname; + const isApiConnect = path === '/api' || path === '/api/'; + + return { + onOpen: (_evt, ws) => { + this.clients.add(ws); + + // 仅对非API连接添加到事件客户端列表 + if (!isApiConnect) { + this.eventClients.add(ws); + // 发送连接生命周期事件 + try { + ws.send(JSON.stringify(new OB11LifeCycleEvent(this.core, LifeCycleSubType.CONNECT))); + } catch (e) { + this.core.context.logger.logError(`[OneBot] [Websocket Server Adapter] 发送生命周期事件失败: ${e}`); + } + } + + this.core.context.logger.log(`[OneBot] [Websocket Server Adapter] 客户端已连接,类型: ${isApiConnect ? 'API' : '事件'},当前连接数: ${this.clients.size}`); + }, + onMessage: (evt, ws) => { + this.actionHandler(c, evt, ws); + }, + onClose: (_evt, ws) => { + this.clients.delete(ws); + this.eventClients.delete(ws); + this.core.context.logger.log(`[OneBot] [Websocket Server Adapter] 客户端已断开,当前连接数: ${this.clients.size}`); + }, + onError: (error) => { + this.core.context.logger.logError(`[OneBot] [Websocket Server Adapter] WebSocket错误: ${error}`); + } + }; })); // 启动服务器 this.server = serve({ fetch: this.app.fetch.bind(this.app), port: this.config.port, + hostname: this.config.host === '0.0.0.0' ? undefined : this.config.host, }); + injectWebSocket(this.server); - this.core.context.logger.log(`[OneBot] [Websocket Server Adapter] 服务器已启动于端口 ${this.config.port}`); + this.core.context.logger.log(`[OneBot] [Websocket Server Adapter] 服务器已启动于 ${this.config.host}:${this.config.port}`); } - - /** - * API动作处理器 - */ - async actionHandler(evt: MessageEvent, ws: WSContext) { + async actionHandler(_c: Context, evt: MessageEvent, ws: WSContext) { const { data } = evt; if (typeof data !== 'string') { this.core.context.logger.logError('[OneBot] [Websocket Server Adapter] 收到非字符串消息'); return; } - const { action, params } = JSON.parse(data); + let receiveData: { action: typeof ActionName[keyof typeof ActionName], params?: any, echo?: any } = { action: ActionName.Unknown, params: {} }; + let echo = undefined; + try { + receiveData = JSON.parse(data); + echo = receiveData.echo; + } catch { + return ws.send(JSON.stringify(OB11Response.error('json解析失败,请检查数据格式', 1400, echo))); + } + receiveData.params = (receiveData?.params) ? receiveData.params : {}; // 兼容类型验证 + // eslint-disable-next-line @typescript-eslint/no-explicit-any + const action = this.actions.get(receiveData.action as any); + if (!action) { + this.logger.logError('[OneBot] [WebSocket Client] 发生错误', '不支持的API ' + receiveData.action); + return ws.send(JSON.stringify(OB11Response.error('不支持的API ' + receiveData.action, 1404, echo))); + } + const retdata = await action.websocketHandle(receiveData.params, echo ?? '', this.name, this.config); + ws.send(JSON.stringify({ ...retdata })); } async reload(newConfig: WebsocketServerConfig) { const wasEnabled = this.isEnable; const oldPort = this.config.port; + const oldHost = this.config.host; + const oldHeartInterval = this.config.heartInterval; this.config = newConfig; if (newConfig.enable && !wasEnabled) { @@ -86,7 +215,8 @@ export class OB11WebsocketServerAdapter extends IOB11NetworkAdapter 0 && this.isEnable) { + this.registerHeartBeat(); + } + return OB11NetworkReloadType.NetWorkReload; + } + return OB11NetworkReloadType.Normal; } } \ No newline at end of file