feat: Asynchronous connect reverse websocket

This commit is contained in:
Disy 2024-02-19 13:54:09 +08:00
parent 9b0f2d0983
commit acb1ec3871
4 changed files with 52 additions and 58 deletions

View File

@ -2,7 +2,7 @@ import {log} from "../common/utils";
const WebSocket = require("ws"); const WebSocket = require("ws");
class ReconnectingWebsocket { export class ReconnectingWebsocket {
private websocket; private websocket;
private readonly url: string; private readonly url: string;
@ -17,14 +17,6 @@ class ReconnectingWebsocket {
public onclose = function () {} public onclose = function () {}
private heartbeat() {
clearTimeout(this.websocket.pingTimeout);
this.websocket.pingTimeout = setTimeout(() => {
this.websocket.terminate();
}, 3000);
}
public send(msg) { public send(msg) {
if (this.websocket && this.websocket.readyState == WebSocket.OPEN) { if (this.websocket && this.websocket.readyState == WebSocket.OPEN) {
this.websocket.send(msg); this.websocket.send(msg);
@ -37,12 +29,12 @@ class ReconnectingWebsocket {
perMessageDeflate: false perMessageDeflate: false
}); });
log("Trying to connect to the websocket server: " + this.url); console.log("Trying to connect to the websocket server: " + this.url);
const instance = this; const instance = this;
this.websocket.on("open", function open() { this.websocket.on("open", function open() {
log("Connected to the websocket server: " + instance.url); console.log("Connected to the websocket server: " + instance.url);
instance.onopen(); instance.onopen();
}); });
@ -52,15 +44,13 @@ class ReconnectingWebsocket {
this.websocket.on("error", console.error); this.websocket.on("error", console.error);
this.websocket.on("ping", this.heartbeat);
this.websocket.on("close", function close() { this.websocket.on("close", function close() {
log("The websocket connection: " + instance.url + " closed, trying reconnecting..."); console.log("The websocket connection: " + instance.url + " closed, trying reconnecting...");
instance.onclose(); instance.onclose();
setTimeout(instance.reconnect, 3000); setTimeout(() => {
instance.reconnect();
}, 3000); // TODO: 重连间隔在配置文件中实现
}); });
} }
} }
export default ReconnectingWebsocket;

View File

@ -1,6 +1,6 @@
import * as websocket from "ws"; import * as websocket from "ws";
import {PostMsgType, wsReply} from "../server"; import {PostMsgType, wsReply} from "../server";
import ReconnectingWebsocket from "../ReconnectingWebsocket"; import {ReconnectingWebsocket} from "../ReconnectingWebsocket";
const websocketList = []; const websocketList = [];

View File

@ -8,7 +8,7 @@ import {OB11Message, OB11MessageData, OB11Return} from './types';
import {actionHandlers, actionMap} from "./actions"; import {actionHandlers, actionMap} from "./actions";
import {OB11Response, OB11WebsocketResponse} from "./actions/utils"; import {OB11Response, OB11WebsocketResponse} from "./actions/utils";
import {callEvent, registerEventSender, unregisterEventSender} from "./event/manager"; import {callEvent, registerEventSender, unregisterEventSender} from "./event/manager";
import ReconnectingWebsocket from "./ReconnectingWebsocket"; import {ReconnectingWebsocket} from "./ReconnectingWebsocket";
import {ActionName} from "./actions/types"; import {ActionName} from "./actions/types";
import {OB11BaseMetaEvent} from "./event/meta/OB11BaseMetaEvent"; import {OB11BaseMetaEvent} from "./event/meta/OB11BaseMetaEvent";
import {OB11BaseNoticeEvent} from "./event/notice/OB11BaseNoticeEvent"; import {OB11BaseNoticeEvent} from "./event/notice/OB11BaseNoticeEvent";
@ -216,49 +216,52 @@ export function initWebsocket(port: number) {
}) })
} }
initReverseWebsocket().then(); initReverseWebsocket();
} }
async function initReverseWebsocket() { function initReverseWebsocket() {
const config = getConfigUtil().getConfig(); const config = getConfigUtil().getConfig();
if (config.enableWsReverse) { if (config.enableWsReverse) {
console.log("Prepare to connect all reverse websockets...");
for (const url of config.wsHosts) { for (const url of config.wsHosts) {
try { new Promise(() => {
let wsClient = new ReconnectingWebsocket(url); try {
registerEventSender(wsClient); let wsClient = new ReconnectingWebsocket(url);
registerEventSender(wsClient);
wsClient.onopen = function () { wsClient.onopen = function () {
wsReply(wsClient, new OB11LifeCycleEvent(LifeCycleSubType.CONNECT)); wsReply(wsClient, new OB11LifeCycleEvent(LifeCycleSubType.CONNECT));
}
wsClient.onclose = function () {
unregisterEventSender(wsClient);
}
wsClient.onmessage = async function (msg) {
let receiveData: { action: ActionName, params: any, echo?: string } = {action: null, params: {}}
let echo = ""
log("收到正向Websocket消息", msg.toString())
try {
receiveData = JSON.parse(msg.toString())
echo = receiveData.echo
} catch (e) {
return wsReply(wsClient, OB11WebsocketResponse.error("json解析失败请检查数据格式", 1400, echo))
} }
const action: BaseAction<any, any> = actionMap.get(receiveData.action);
if (!action) { wsClient.onclose = function () {
return wsReply(wsClient, OB11WebsocketResponse.error("不支持的api " + receiveData.action, 1404, echo)) unregisterEventSender(wsClient);
} }
try {
let handleResult = await action.websocketHandle(receiveData.params, echo); wsClient.onmessage = async function (msg) {
wsReply(wsClient, handleResult) let receiveData: { action: ActionName, params: any, echo?: string } = {action: null, params: {}}
} catch (e) { let echo = ""
wsReply(wsClient, OB11WebsocketResponse.error(`api处理出错:${e}`, 1200, echo)) log("收到正向Websocket消息", msg.toString())
try {
receiveData = JSON.parse(msg.toString())
echo = receiveData.echo
} catch (e) {
return wsReply(wsClient, OB11WebsocketResponse.error("json解析失败请检查数据格式", 1400, echo))
}
const action: BaseAction<any, any> = actionMap.get(receiveData.action);
if (!action) {
return wsReply(wsClient, OB11WebsocketResponse.error("不支持的api " + receiveData.action, 1404, echo))
}
try {
let handleResult = await action.websocketHandle(receiveData.params, echo);
wsReply(wsClient, handleResult)
} catch (e) {
wsReply(wsClient, OB11WebsocketResponse.error(`api处理出错:${e}`, 1200, echo))
}
} }
} }
} catch (e) {
catch (e) { log(e.stack);
console.log(e); }
} }).then();
} }
} }
} }
@ -298,7 +301,6 @@ export function postMsg(msg: PostMsgType) {
} }
log("新消息事件ws上报", msg); log("新消息事件ws上报", msg);
console.log("新消息事件ws上报", msg);
callEvent(msg); callEvent(msg);
} }

View File

@ -155,13 +155,15 @@ async function onSettingWindowCreated(view: Element) {
function addHostEle(type: string, initValue: string = "") { function addHostEle(type: string, initValue: string = "") {
let addressDoc = parser.parseFromString(createHttpHostEleStr(initValue), "text/html"); let addressEle, hostItemsEle;
let addressEle = addressDoc.querySelector("setting-item")
let hostItemsEle;
if (type === "ws") { if (type === "ws") {
let addressDoc = parser.parseFromString(createWsHostEleStr(initValue), "text/html");
addressEle = addressDoc.querySelector("setting-item")
hostItemsEle = document.getElementById("wsHostItems"); hostItemsEle = document.getElementById("wsHostItems");
} }
else { else {
let addressDoc = parser.parseFromString(createHttpHostEleStr(initValue), "text/html");
addressEle = addressDoc.querySelector("setting-item")
hostItemsEle = document.getElementById("httpHostItems"); hostItemsEle = document.getElementById("httpHostItems");
} }