import asyncio import json import os import secrets import ssl import websockets from django.conf import settings from urllib.parse import quote from .models import Gateway class GatewayConnection: _ssl_context: ssl.SSLContext = None def __init__(self, host: str, port: int): if settings.CONNECTION_GATEWAY_AUTH_KEY and not GatewayConnection._ssl_context: ctx = ssl.create_default_context(purpose=ssl.Purpose.CLIENT_AUTH) ctx.load_cert_chain( os.path.realpath(settings.CONNECTION_GATEWAY_AUTH_CERTIFICATE), os.path.realpath(settings.CONNECTION_GATEWAY_AUTH_KEY), ) if settings.CONNECTION_GATEWAY_AUTH_CA: ctx.load_verify_locations( cafile=os.path.realpath(settings.CONNECTION_GATEWAY_AUTH_CA), ) ctx.verify_mode = ssl.CERT_REQUIRED GatewayConnection._ssl_context = ctx proto = "wss" if GatewayConnection._ssl_context else "ws" self.url = f"{proto}://localhost:9000/connect/{quote(host)}:{quote(str(port))}" async def connect(self): self.context = websockets.connect(self.url, ssl=GatewayConnection._ssl_context) try: self.socket = await self.context.__aenter__() except OSError: raise ConnectionError() async def send(self, data): await self.socket.send(data) def recv(self, timeout=None): return asyncio.wait_for(self.socket.recv(), timeout) async def close(self): await self.socket.close() await self.context.__aexit__(None, None, None) class GatewayAdminConnection: _ssl_context: ssl.SSLContext = None def __init__(self, gateway: Gateway): if not settings.CONNECTION_GATEWAY_AUTH_KEY: raise RuntimeError( "CONNECTION_GATEWAY_AUTH_KEY is required to manage connection gateways" ) if not GatewayAdminConnection._ssl_context: ctx = ssl.create_default_context(purpose=ssl.Purpose.CLIENT_AUTH) ctx.load_cert_chain( os.path.realpath(settings.CONNECTION_GATEWAY_AUTH_CERTIFICATE), os.path.realpath(settings.CONNECTION_GATEWAY_AUTH_KEY), ) if settings.CONNECTION_GATEWAY_AUTH_CA: ctx.load_verify_locations( cafile=os.path.realpath(settings.CONNECTION_GATEWAY_AUTH_CA), ) ctx.verify_mode = ssl.CERT_REQUIRED GatewayAdminConnection._ssl_context = ctx self.url = f"wss://{gateway.host}:{gateway.admin_port}" async def connect(self): self.context = websockets.connect( self.url, ssl=GatewayAdminConnection._ssl_context ) try: self.socket = await self.context.__aenter__() except OSError: raise ConnectionError() async def authorize_client(self) -> str: token = secrets.token_hex(32) await self.send( json.dumps( { "_": "authorize-client", "token": token, } ) ) return token async def send(self, data): await self.socket.send(data) async def close(self): await self.socket.close() await self.context.__aexit__(None, None, None)