From a2ad39f78d016b2672ab246a44c76602586df39c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=89=8B=E7=93=9C=E4=B8=80=E5=8D=81=E9=9B=AA?= Date: Sun, 11 Aug 2024 00:08:52 +0800 Subject: [PATCH] chore: websocket --- src/onebot/network/active-websocket.ts | 68 ++++++++++++++++++-------- 1 file changed, 47 insertions(+), 21 deletions(-) diff --git a/src/onebot/network/active-websocket.ts b/src/onebot/network/active-websocket.ts index e0ad2e9c..674ef009 100644 --- a/src/onebot/network/active-websocket.ts +++ b/src/onebot/network/active-websocket.ts @@ -1,18 +1,16 @@ import { IOB11NetworkAdapter, OB11EmitEventContent } from '@/onebot/network/index'; - import { WebSocket as NodeWebSocket } from 'ws'; import BaseAction from '@/onebot/action/BaseAction'; -import { OB11BaseEvent } from '@/onebot/event/OB11BaseEvent'; import { sleep } from '@/common/utils/helper'; export class OB11ActiveWebSocketAdapter implements IOB11NetworkAdapter { url: string; reconnectIntervalInMillis: number; isClosed: boolean = false; - private connection: NodeWebSocket | null = null; private actionMap: Map> = new Map(); heartbeatInterval: number; + private heartbeatTimer: NodeJS.Timeout | null = null; constructor(url: string, reconnectIntervalInMillis: number, heartbeatInterval: number) { this.url = url; @@ -21,7 +19,14 @@ export class OB11ActiveWebSocketAdapter implements IOB11NetworkAdapter { } registerHeartBeat() { - //WS反向心跳 + // WS反向心跳 + if (this.connection) { + this.heartbeatTimer = setInterval(() => { + if (this.connection && this.connection.readyState === NodeWebSocket.OPEN) { + this.connection.ping(); + } + }, this.heartbeatInterval); + } } registerAction, P, R>(action: T) { @@ -30,9 +35,8 @@ export class OB11ActiveWebSocketAdapter implements IOB11NetworkAdapter { onEvent(event: T) { if (this.connection) { - // this.connection.send(JSON.stringify(event)); - // TODO: wrap the event, and send the wrapped to the server. - // TODO: consider using a utility function + const wrappedEvent = this.wrapEvent(event); + this.connection.send(JSON.stringify(wrappedEvent)); } } @@ -44,37 +48,59 @@ export class OB11ActiveWebSocketAdapter implements IOB11NetworkAdapter { } close() { - this.isClosed = true; if (this.isClosed) { throw new Error('Cannot close a closed WebSocket connection'); } + this.isClosed = true; if (this.connection) { this.connection.close(); this.connection = null; } + if (this.heartbeatTimer) { + clearInterval(this.heartbeatTimer); + this.heartbeatTimer = null; + } } private async tryConnect() { while (!this.connection) { try { this.connection = new NodeWebSocket(this.url); + this.connection.on('message', (data) => { + this.handleMessage(data); + }); + this.connection.once('close', () => { + if (!this.isClosed) { + this.connection = null; + setTimeout(() => this.tryConnect(), this.reconnectIntervalInMillis); + } + }); + this.registerHeartBeat(); } catch (e) { this.connection = null; console.error('Failed to connect to the server, retrying in 5 seconds...'); await sleep(5000); } } - - this.connection.on('message', (data) => { - // TODO: extract action name and payload from the message, then call the corresponding action. - // TODO: consider using a utility function - }); - - this.connection.once('close', () => { - if (!this.isClosed) { - this.connection = new NodeWebSocket(this.url); - this.tryConnect(); - } - }); } -} + + private handleMessage(data: any) { + try { + const message = JSON.parse(data); + const action = this.actionMap.get(message.actionName); + if (action) { + action.handle(message.payload); + } + } catch (e) { + console.error('Failed to handle message:', e); + } + } + + private wrapEvent(event: T) { + // Wrap the event as needed + return { + type: 'event', + data: event + }; + } +} \ No newline at end of file