From e7d138448a66f7696af8ed780f861e133d152d2c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=89=8B=E7=93=9C=E4=B8=80=E5=8D=81=E9=9B=AA?= Date: Sat, 16 Nov 2024 18:10:03 +0800 Subject: [PATCH] refactor: reloadNetwork --- src/onebot/config/config.ts | 1 + src/onebot/index.ts | 172 ++++++------------------ src/onebot/network/active-http.ts | 23 +++- src/onebot/network/active-websocket.ts | 44 +++++- src/onebot/network/index.ts | 14 +- src/onebot/network/passive-http.ts | 27 +++- src/onebot/network/passive-websocket.ts | 46 ++++++- 7 files changed, 176 insertions(+), 151 deletions(-) diff --git a/src/onebot/config/config.ts b/src/onebot/config/config.ts index daaaa7a1..bf509bf1 100644 --- a/src/onebot/config/config.ts +++ b/src/onebot/config/config.ts @@ -122,6 +122,7 @@ export const mergeNetworkDefaultConfig = { websocketClients: websocketClientDefaultConfigs, } as const; +export type NetworkConfigAdapter = HttpServerConfig | HttpClientConfig | WebsocketServerConfig | WebsocketClientConfig; type NetworkConfigKeys = keyof typeof mergeNetworkDefaultConfig; export function mergeOneBotConfigs( diff --git a/src/onebot/index.ts b/src/onebot/index.ts index 9349de69..15aecc32 100644 --- a/src/onebot/index.ts +++ b/src/onebot/index.ts @@ -16,9 +16,11 @@ import { } from '@/core'; import { OB11ConfigLoader } from '@/onebot/config'; import { + IOB11NetworkAdapter, OB11ActiveHttpAdapter, OB11ActiveWebSocketAdapter, OB11NetworkManager, + OB11NetworkReloadType, OB11PassiveHttpAdapter, OB11PassiveWebSocketAdapter, } from '@/onebot/network'; @@ -45,7 +47,7 @@ import { OB11GroupRecallNoticeEvent } from '@/onebot/event/notice/OB11GroupRecal import { LRUCache } from '@/common/lru-cache'; import { NodeIKernelRecentContactListener } from '@/core/listeners/NodeIKernelRecentContactListener'; import { BotOfflineEvent } from './event/notice/BotOfflineEvent'; -import { mergeOneBotConfigs, migrateOneBotConfigsV1, OneBotConfig } from './config/config'; +import { mergeOneBotConfigs, migrateOneBotConfigsV1, NetworkConfigAdapter, OneBotConfig } from './config/config'; import { OB11Message } from './types'; //OneBot实现类 @@ -179,152 +181,56 @@ export class NapCatOneBot11Adapter { const newLog = await this.creatOneBotLog(now); this.context.logger.log(`[Notice] [OneBot11] 配置变更前:\n${prevLog}`); this.context.logger.log(`[Notice] [OneBot11] 配置变更后:\n${newLog}`); + const { added: addedHttpServers, removed: removedHttpServers } = this.findDifference(prev.network.httpServers, now.network.httpServers); const { added: addedHttpClients, removed: removedHttpClients } = this.findDifference(prev.network.httpClients, now.network.httpClients); const { added: addedWebSocketServers, removed: removedWebSocketServers } = this.findDifference(prev.network.websocketServers, now.network.websocketServers); const { added: addedWebSocketClients, removed: removedWebSocketClients } = this.findDifference(prev.network.websocketClients, now.network.websocketClients); - // 移除旧的 HTTP 服务器 - for (const server of removedHttpServers) { - await this.networkManager.closeAdapterByPredicate((adapter) => adapter.name === server.name); - } + await this.handleRemovedAdapters(removedHttpServers); + await this.handleRemovedAdapters(removedHttpClients); + await this.handleRemovedAdapters(removedWebSocketServers); + await this.handleRemovedAdapters(removedWebSocketClients); - // 移除旧的 HTTP 客户端 - for (const client of removedHttpClients) { - await this.networkManager.closeAdapterByPredicate((adapter) => adapter.name === client.name); - } + await this.handlerConfigChange(now.network.httpServers); + await this.handlerConfigChange(now.network.httpClients); + await this.handlerConfigChange(now.network.websocketServers); + await this.handlerConfigChange(now.network.websocketClients); - // 移除旧的 WebSocket 服务器 - for (const server of removedWebSocketServers) { - await this.networkManager.closeAdapterByPredicate((adapter) => adapter.name === server.name); - } + await this.handleAddedAdapters(addedHttpServers, OB11PassiveHttpAdapter); + await this.handleAddedAdapters(addedHttpClients, OB11ActiveHttpAdapter); + await this.handleAddedAdapters(addedWebSocketServers, OB11PassiveWebSocketAdapter); + await this.handleAddedAdapters(addedWebSocketClients, OB11ActiveWebSocketAdapter); + } - // 移除旧的 WebSocket 客户端 - for (const client of removedWebSocketClients) { - await this.networkManager.closeAdapterByPredicate((adapter) => adapter.name === client.name); - } + private async handlerConfigChange(adapters: Array) { + for (const adapterConfig of adapters) { + const existingAdapter = this.networkManager.findSomeAdapter(adapterConfig.name); + if (existingAdapter) { + let networkChange = await existingAdapter.reload(adapterConfig); + if (networkChange === OB11NetworkReloadType.NetWorkClose) { + this.networkManager.closeSomeAdapters([existingAdapter]); - // 处理 enable 状态变化的 HTTP 服务器 - for (const server of now.network.httpServers) { - const prevServer = prev.network.httpServers.find(s => s.name === server.name); - if (prevServer && prevServer.enable !== server.enable) { - if (server.enable) { - let adapter = new OB11PassiveHttpAdapter(server.name, server, this.core, this.actions); - adapter.open(); - this.networkManager.registerAdapter( - adapter - ); - } else { - await this.networkManager.closeAdapterByPredicate((adapter) => adapter.name === server.name); } } } - - // 处理 enable 状态变化的 HTTP 客户端 - for (const client of now.network.httpClients) { - const prevClient = prev.network.httpClients.find(c => c.name === client.name); - if (prevClient && prevClient.enable !== client.enable) { - if (client.enable) { - let adapter = new OB11ActiveHttpAdapter(client.name, client, this.core, this, this.actions); - adapter.open(); - this.networkManager.registerAdapter( - adapter - ); - } else { - await this.networkManager.closeAdapterByPredicate((adapter) => adapter.name === client.name); - } - } - } - - // 处理 enable 状态变化的 WebSocket 服务器 - for (const server of now.network.websocketServers) { - const prevServer = prev.network.websocketServers.find(s => s.name === server.name); - if (prevServer && prevServer.enable !== server.enable) { - if (server.enable) { - let adapter = new OB11PassiveWebSocketAdapter( - server.name, - server, - this.core, - this.actions - ); - adapter.open(); - this.networkManager.registerAdapter( - adapter - ); - } else { - await this.networkManager.closeAdapterByPredicate((adapter) => adapter.name === server.name); - } - } - } - - // 处理 enable 状态变化的 WebSocket 客户端 - for (const client of now.network.websocketClients) { - const prevClient = prev.network.websocketClients.find(c => c.name === client.name); - if (prevClient && prevClient.enable !== client.enable) { - if (client.enable) { - let adapter = new OB11ActiveWebSocketAdapter( - client.name, - client, - this.core, - this.actions - ) - this.networkManager.registerAdapter( - adapter - ); - } else { - await this.networkManager.closeAdapterByPredicate((adapter) => adapter.name === client.name); - } - } - } - - // 注册新的 HTTP 服务器 - for (const server of addedHttpServers) { - if (server.enable) { - let adapter = new OB11PassiveHttpAdapter(server.name, server, this.core, this.actions); - adapter.open(); - this.networkManager.registerAdapter(adapter); - } - } - - // 注册新的 HTTP 客户端 - for (const client of addedHttpClients) { - - if (client.enable) { - let adapter = new OB11ActiveHttpAdapter(client.name, client, this.core, this, this.actions); - adapter.open(); - this.networkManager.registerAdapter(adapter); - } - } - - // 注册新的 WebSocket 服务器 - for (const server of addedWebSocketServers) { - if (server.enable) { - let adapter = new OB11PassiveWebSocketAdapter( - server.name, - server, - this.core, - this.actions - ); - adapter.open(); - this.networkManager.registerAdapter(adapter); - } - } - - // 注册新的 WebSocket 客户端 - for (const client of addedWebSocketClients) { - if (client.enable) { - let adapter = new OB11ActiveWebSocketAdapter( - client.name, - client, - this.core, - this.actions - ) - adapter.open(); - this.networkManager.registerAdapter(adapter); - } - } } + private async handleRemovedAdapters(adapters: Array<{ name: string }>): Promise { + for (const adapter of adapters) { + await this.networkManager.closeAdapterByPredicate((existingAdapter) => existingAdapter.name === adapter.name); + } + } + + private async handleAddedAdapters IOB11NetworkAdapter>(addedAdapters: Array, AdapterClass: T) { + for (const adapter of addedAdapters) { + if (adapter.enable) { + const newAdapter = new AdapterClass(adapter.name, adapter, this.core, this.actions); + await newAdapter.open(); + this.networkManager.registerAdapter(newAdapter); + } + } + } private findDifference(prev: T[], now: T[]): { added: T[]; removed: T[] } { const added = now.filter((item) => !prev.includes(item)); const removed = prev.filter((item) => !now.includes(item)); diff --git a/src/onebot/network/active-http.ts b/src/onebot/network/active-http.ts index 1c53b4bd..26975454 100644 --- a/src/onebot/network/active-http.ts +++ b/src/onebot/network/active-http.ts @@ -1,4 +1,4 @@ -import { IOB11NetworkAdapter, OB11EmitEventContent } from '@/onebot/network/index'; +import { IOB11NetworkAdapter, OB11EmitEventContent, OB11NetworkReloadType } from '@/onebot/network/index'; import { createHmac } from 'crypto'; import { LogWrapper } from '@/common/log'; import { QuickAction, QuickActionEvent } from '../types'; @@ -11,7 +11,7 @@ import { ActionMap } from '../action'; export class OB11ActiveHttpAdapter implements IOB11NetworkAdapter { logger: LogWrapper; isEnable: boolean = false; - config: HttpClientConfig; + public config: HttpClientConfig; constructor( public name: string, config: HttpClientConfig, @@ -22,7 +22,7 @@ export class OB11ActiveHttpAdapter implements IOB11NetworkAdapter { this.logger = core.context.logger; this.config = structuredClone(config); } - + onEvent(event: T) { if (!this.isEnable) { @@ -67,7 +67,20 @@ export class OB11ActiveHttpAdapter implements IOB11NetworkAdapter { close() { this.isEnable = false; } - async reload(config: HttpClientConfig){ - this.config = structuredClone(config); + async reload(newconfig: HttpClientConfig) { + const wasEnabled = this.isEnable; + const oldUrl = this.config.url; + this.config = newconfig; + if (newconfig.enable && !wasEnabled) { + this.open(); + return OB11NetworkReloadType.NetWorkOpen; + } else if (!newconfig.enable && wasEnabled) { + this.close(); + return OB11NetworkReloadType.NetWorkClose; + } + if (oldUrl !== newconfig.url) { + return OB11NetworkReloadType.NetWorkReload; + } + return OB11NetworkReloadType.Normal; } } diff --git a/src/onebot/network/active-websocket.ts b/src/onebot/network/active-websocket.ts index 15229140..7d370acb 100644 --- a/src/onebot/network/active-websocket.ts +++ b/src/onebot/network/active-websocket.ts @@ -1,4 +1,4 @@ -import { IOB11NetworkAdapter, OB11EmitEventContent } from '@/onebot/network/index'; +import { IOB11NetworkAdapter, OB11EmitEventContent, OB11NetworkReloadType } from '@/onebot/network/index'; import { WebSocket } from 'ws'; import { OB11HeartbeatEvent } from '../event/meta/OB11HeartbeatEvent'; import { NapCatCore } from '@/core'; @@ -14,7 +14,7 @@ export class OB11ActiveWebSocketAdapter implements IOB11NetworkAdapter { logger: LogWrapper; private connection: WebSocket | null = null; private heartbeatRef: NodeJS.Timeout | null = null; - config: WebsocketClientConfig; + public config: WebsocketClientConfig; constructor( public name: string, @@ -154,7 +154,43 @@ export class OB11ActiveWebSocketAdapter implements IOB11NetworkAdapter { const retdata = await action.websocketHandle(receiveData.params, echo ?? '', this.name); this.checkStateAndReply({ ...retdata }); } - async reload(config: WebsocketClientConfig) { - + async reload(newConfig: WebsocketClientConfig) { + const wasEnabled = this.isEnable; + const oldUrl = this.config.url; + const oldHeartInterval = this.config.heartInterval; + this.config = newConfig; + + if (newConfig.enable && !wasEnabled) { + this.open(); + return OB11NetworkReloadType.NetWorkOpen; + } else if (!newConfig.enable && wasEnabled) { + this.close(); + return OB11NetworkReloadType.NetWorkClose; + } + + if (oldUrl !== newConfig.url) { + this.close(); + if (newConfig.enable) { + this.open(); + } + return OB11NetworkReloadType.NetWorkReload; + } + + if (oldHeartInterval !== newConfig.heartInterval) { + if (this.heartbeatRef) { + clearInterval(this.heartbeatRef); + this.heartbeatRef = null; + } + if (newConfig.heartInterval > 0 && this.isEnable) { + this.heartbeatRef = setInterval(() => { + if (this.connection && this.connection.readyState === WebSocket.OPEN) { + this.connection.send(JSON.stringify(new OB11HeartbeatEvent(this.core, newConfig.heartInterval, this.core.selfInfo.online ?? true, true))); + } + }, newConfig.heartInterval); + } + return OB11NetworkReloadType.NetWorkReload; + } + + return OB11NetworkReloadType.Normal; } } diff --git a/src/onebot/network/index.ts b/src/onebot/network/index.ts index 68a29bd6..dc8b8b67 100644 --- a/src/onebot/network/index.ts +++ b/src/onebot/network/index.ts @@ -1,21 +1,29 @@ import { OB11BaseEvent } from '@/onebot/event/OB11BaseEvent'; import { OB11Message } from '@/onebot'; import { ActionMap } from '@/onebot/action'; +import { NetworkConfigAdapter } from '../config/config'; export type OB11EmitEventContent = OB11BaseEvent | OB11Message; - +export enum OB11NetworkReloadType { + Normal = 0, + ConfigChange = 1, + NetWorkReload = 2, + NetWorkClose = 3, + NetWorkOpen = 4 +} export interface IOB11NetworkAdapter { actions: ActionMap; name: string; isEnable: boolean; - + config: NetworkConfigAdapter; + onEvent(event: T): void; open(): void | Promise; close(): void | Promise; - reload(config: any): void | Promise; + reload(config: any): OB11NetworkReloadType | Promise; } export class OB11NetworkManager { diff --git a/src/onebot/network/passive-http.ts b/src/onebot/network/passive-http.ts index 97c0196c..c1ca6afc 100644 --- a/src/onebot/network/passive-http.ts +++ b/src/onebot/network/passive-http.ts @@ -1,4 +1,4 @@ -import { IOB11NetworkAdapter } from './index'; +import { IOB11NetworkAdapter, OB11NetworkReloadType } from './index'; import express, { Express, Request, Response } from 'express'; import http from 'http'; import { NapCatCore } from '@/core'; @@ -11,7 +11,7 @@ export class OB11PassiveHttpAdapter implements IOB11NetworkAdapter { private app: Express | undefined; private server: http.Server | undefined; isEnable: boolean = false; - config: HttpServerConfig; + public config: HttpServerConfig; constructor( public name: string, @@ -113,7 +113,28 @@ export class OB11PassiveHttpAdapter implements IOB11NetworkAdapter { return res.json(OB11Response.error('不支持的api ' + actionName, 200)); } } - async reload(config: HttpServerConfig) { + async reload(newConfig: HttpServerConfig) { + const wasEnabled = this.isEnable; + const oldPort = this.config.port; + this.config = newConfig; + + if (newConfig.enable && !wasEnabled) { + this.open(); + return OB11NetworkReloadType.NetWorkOpen; + } else if (!newConfig.enable && wasEnabled) { + this.close(); + return OB11NetworkReloadType.NetWorkClose; + } + + if (oldPort !== newConfig.port) { + this.close(); + if (newConfig.enable) { + this.open(); + } + return OB11NetworkReloadType.NetWorkReload; + } + + return OB11NetworkReloadType.Normal; } } diff --git a/src/onebot/network/passive-websocket.ts b/src/onebot/network/passive-websocket.ts index d37a69b1..f8343dbd 100644 --- a/src/onebot/network/passive-websocket.ts +++ b/src/onebot/network/passive-websocket.ts @@ -1,4 +1,4 @@ -import { IOB11NetworkAdapter, OB11EmitEventContent } from './index'; +import { IOB11NetworkAdapter, OB11EmitEventContent, OB11NetworkReloadType } from './index'; import urlParse from 'url'; import { WebSocket, WebSocketServer } from 'ws'; import { Mutex } from 'async-mutex'; @@ -20,7 +20,7 @@ export class OB11PassiveWebSocketAdapter implements IOB11NetworkAdapter { hasBeenClosed: boolean = false; heartbeatInterval: number = 0; logger: LogWrapper; - config: WebsocketServerConfig; + public config: WebsocketServerConfig; private heartbeatIntervalId: NodeJS.Timeout | null = null; wsClientWithEvent: WebSocket[] = []; @@ -195,8 +195,48 @@ export class OB11PassiveWebSocketAdapter implements IOB11NetworkAdapter { const retdata = await action.websocketHandle(receiveData.params, echo ?? '', this.name); this.checkStateAndReply({ ...retdata }, wsClient); } - async reload(config: WebsocketServerConfig) { + async reload(newConfig: WebsocketServerConfig) { + const wasEnabled = this.isEnable; + const oldPort = this.config.port; + const oldHost = this.config.host; + const oldHeartbeatInterval = this.heartbeatInterval; + this.config = newConfig; + + if (newConfig.enable && !wasEnabled) { + this.open(); + return OB11NetworkReloadType.NetWorkOpen; + } else if (!newConfig.enable && wasEnabled) { + this.close(); + return OB11NetworkReloadType.NetWorkClose; + } + + if (oldPort !== newConfig.port || oldHost !== newConfig.host) { + this.close(); + this.wsServer = new WebSocketServer({ + port: newConfig.port, + host: newConfig.host === '0.0.0.0' ? '' : newConfig.host, + maxPayload: 1024 * 1024 * 1024, + }); + if (newConfig.enable) { + this.open(); + } + return OB11NetworkReloadType.NetWorkReload; + } + + if (oldHeartbeatInterval !== newConfig.heartInterval) { + if (this.heartbeatIntervalId) { + clearInterval(this.heartbeatIntervalId); + this.heartbeatIntervalId = null; + } + this.heartbeatInterval = newConfig.heartInterval; + if (newConfig.heartInterval > 0 && this.isEnable) { + this.registerHeartBeat(); + } + return OB11NetworkReloadType.NetWorkReload; + } + + return OB11NetworkReloadType.Normal; } }