#!/usr/bin/env python3.12 from fastapi import FastAPI, Request, HTTPException from pydantic import BaseModel from aiohttp import ClientSession, ClientTimeout class ValidXCRequest(BaseModel): """ - **key**: valid XC API key - **bid**: bot id - **cmd**: bot command - **data**: command data """ key: str bid: int cmd: str data: dict | None = None class XC(FastAPI): """XC (CrossComm) Endpoints""" def __init__(self, app: FastAPI, util, constants, glob_state): # pylint: disable=super-init-not-called self.app = app self.util = util self.constants = constants self.glob_state = glob_state self.endpoints = { "xc": self.xc_handler, #tbd } for endpoint, handler in self.endpoints.items(): app.add_api_route(f"/{endpoint}/", handler, methods=["POST"]) async def xc_handler(self, data: ValidXCRequest, request: Request): """ /xc/ Handle XC Commands """ key = data.key bid = data.bid cmd = data.cmd cmd_data = data.data req_type = 0 if bid in [1]: req_type = 2 if not self.util.check_key(path=request.url.path, req_type=req_type, key=key): raise HTTPException(status_code=403, detail="Unauthorized") BID_ADDR_MAP = { 0: '10.10.10.101:5991', # Thomas/Aces 1: '10.10.10.100:5992' # MS & Waleed Combo } if not bid in BID_ADDR_MAP.keys(): return { 'err': True, 'errorText': 'Invalid bot id' } bot_api_url = f'http://{BID_ADDR_MAP[bid]}/' async with ClientSession() as session: async with session.post(f"{bot_api_url}{cmd}", json=cmd_data, headers={ 'Content-Type': 'application/json; charset=utf-8' }, timeout=ClientTimeout(connect=5, sock_read=5)) as request: response = await request.json() return { 'success': True, 'response': response }