#!/usr/bin/env python3.12 # pylint: disable=invalid-name from fastapi import FastAPI, Request, HTTPException from pydantic import BaseModel from aiohttp import ClientSession, ClientTimeout from .constructors import ValidXCRequest class XC(FastAPI): """XC (CrossComm) Endpoints""" def __init__(self, app: FastAPI, util, constants, glob_state): # pylint: disable=super-init-not-called self.app = app self.util = util self.constants = constants self.glob_state = glob_state self.endpoints = { "xc": self.xc_handler, } for endpoint, handler in self.endpoints.items(): app.add_api_route(f"/{endpoint}", handler, methods=["POST"], include_in_schema=False) # async def put_ws_handler(self, ws: WebSocket): # await ws.accept() # await self.audio_streamer.handle_client(ws) async def xc_handler(self, data: ValidXCRequest, request: Request): """ /xc Handle XC Commands """ try: key = data.key bid = data.bid cmd = data.cmd cmd_data = data.data if not self.util.check_key(path=request.url.path, req_type=0, key=key): raise HTTPException(status_code=403, detail="Unauthorized") BID_ADDR_MAP = { 0: '10.10.10.101:5991', # Patrick (a.k.a. Thomas a.k.a. Aces) # TODO: add Havoc? } if not bid in BID_ADDR_MAP: return { 'err': True, 'errorText': 'Invalid bot id' } bot_api_url = f'http://{BID_ADDR_MAP[bid]}/' async with ClientSession() as session: async with await session.post(f"{bot_api_url}{cmd}", json=cmd_data, headers={ 'Content-Type': 'application/json; charset=utf-8' }, timeout=ClientTimeout(connect=5, sock_read=5)) as request: response = await request.json() return { 'success': True, 'response': response } except: pass