#!/usr/bin/env python3.12 from fastapi import FastAPI, WebSocket, WebSocketDisconnect, BackgroundTasks from fastapi_utils.tasks import repeat_every import time import uuid import json import logging import asyncio import traceback import random from cah.constructors import CAHClient, CAHPlayer, CAHGame from cah.websocket_conn import ConnectionManager class CAH(FastAPI): """CAH Endpoint(s)""" # TASKS async def send_heartbeats(self) -> None: try: while True: logging.critical("Heartbeat!") self.connection_manager.broadcast({ "event": "heartbeat", "ts": int(time.time()) }) await asyncio.sleep(10) except: print(traceback.format_exc()) async def clean_stale_games(self) -> None: try: logging.critical("Looking for stale games...") for game in self.games: print(f"Checking {game}...") if not game.players: logging.critical(f"{game.id} seems pretty stale! {game.resources}") self.games.remove(game) return except: print(traceback.format_exc()) async def periodicals(self) -> None: asyncio.get_event_loop().create_task(self.send_heartbeats()) asyncio.get_event_loop().create_task(self.clean_stale_games()) # END TASKS def __init__(self, app: FastAPI, util, constants, glob_state): # pylint: disable=super-init-not-called self.app = app self.util = util self.constants = constants self.glob_state = glob_state self.games: list[dict] = [] self.ws_endpoints = { "cah": self.cah_handler, #tbd } self.endpoints = { #tbd if any non-WS endpoints } self.connection_manager = ConnectionManager() for endpoint, handler in self.ws_endpoints.items(): print(f"Adding websocket route: {endpoint} @ {handler}") app.add_api_websocket_route(f"/{endpoint}/", handler) for endpoint, handler in self.endpoints.items(): app.add_api_route(f"/{endpoint}/", handler, methods=["POST"]) # heartbeats = app.loop.create_task(self.send_heartbeats()) # asyncio.get_event_loop().run_until_complete(heartbeats) async def remove_player(self, game: str, player: str): try: _game = None for __game in self.games: if __game.id == game: _game = __game print(f"Got game!!!\n{_game}\nPlayers: {_game.players}") other_players_still_share_resource = False for idx, _player in enumerate(_game.players): if _player.get('handle') == player: _game.players.pop(idx) await self.connection_manager.broadcast({ 'event': 'player_left', 'ts': int(time.time()), 'data': { 'player': _player } }) # Change to broadcast to current game members only # else: # if _player.get('related_resource') == _player.related_resource: # other_players_still_share_resource = True # if not other_players_still_share_resource: # _game.resources.remove(_player.get('related_resource')) except: print(traceback.format_exc()) return { 'err': True, 'errorText': 'Server error' } async def remove_resource(self, games: list, resource: str): try: _game = None for resource_game in games: for __game in self.games: if __game.id == resource_game: _game = __game print(f"Got game!!!\n{_game}\nResources: {_game.resources}") for idx, _resource in enumerate(_game.resources): if _resource == resource: _game.resources.pop(idx) await self.connection_manager.broadcast({ 'event': 'resource_left', 'ts': int(time.time()), 'data': { 'resource': _resource, } }) # Change to broadcast to current game members only resource_obj = self.connection_manager.get_connection_by_resource_label(resource) # for player in resource_obj.players: # await self.remove_player(player.current_game, player) except: print(traceback.format_exc()) return { 'err': True, 'errorText': 'Server error' } async def join_player(self, player: CAHPlayer, game: str): joined_game = self.get_game_by_id(game) if not joined_game: return { 'err': True, 'errorText': 'Game not found', 'data': { 'requestedGame': game } } if player.current_game == joined_game: return { 'err': True, 'errorText': 'You are already here.', } joined_game.players.append(player.__dict__) await self.connection_manager.broadcast({ 'event': 'player_joined', 'ts': int(time.time()), 'data': { 'player': player.__dict__, } }) # Change to broadcast to current game members only if not player.related_resource in joined_game.resources: joined_game.resources.append(player.related_resource) return joined_game async def cah_handler(self, websocket: WebSocket): """/cah WebSocket""" await self.connection_manager.connect(websocket) await websocket.send_json({ "event": "connection_established", "ts": int(time.time()), }) try: while True: data = await websocket.receive_json() event = data.get('event') if not event: await websocket.send_json({ 'err': True, 'errorText': 'Invalid data received, closing connection.' }) return await websocket.close() print(f"Event: {event}") match event: case 'handshake': await self.cah_handshake(websocket, data) case 'refresh': await self.connection_manager.send_client_and_game_lists(self, websocket) case 'create_game': await self.create_game(websocket, data) case 'join_game': sender = self.connection_manager.get_connection_by_ws(websocket) sender_client = sender.get('client') handle = data.get('handle') game = data.get('game') player = CAHPlayer(id=str(uuid.uuid4()), current_game=game, platform=sender_client.platform, related_resource=sender_client.resource, joined_at=int(time.time()), handle=handle) joined = await self.join_player(player, game) if type(joined) == dict and joined.get('err'): await sender.get('websocket').send_json({ 'event': 'join_game_response', 'ts': int(time.time()), 'err': True, 'errorText': joined.get('errorText'), }) else: await sender.get('websocket').send_json({ 'event': 'join_game_response', 'ts': int(time.time()), 'success': True, 'data': { 'game': joined.__dict__, } }) case 'leave_game': sender = self.connection_manager.get_connection_by_ws(websocket) player_handle = data.get('handle') game = data.get('game') player = self.get_player_by_handle(game, player_handle) left = await self.remove_player(game, player) if type(left) == dict and left.get('err'): await sender.get('websocket').send_json({ 'event': 'leave_game_response', 'ts': int(time.time()), 'err': True, 'errorText': left.get('errorText'), }) else: await sender.get('websocket').send_json({ 'event': 'leave_game_response', 'ts': int(time.time()), 'success': True, }) case _: sender = self.connection_manager.get_connection_by_ws(websocket) await self.connection_manager.broadcast({ "event": "echo", "ts": int(time.time()), "from": sender.get('client').resource, "data": data, }) except WebSocketDisconnect: await self.connection_manager.disconnect(self, websocket) def get_game_by_id(self, _id: str): for game in self.games: if game.id == _id: return game return def get_player_by_id(self, game: str, player: str): game = self.get_game_by_id(game) if not game: return { 'err': True, 'errorText': f'Cannot lookup player for unknown game {game}' } for _player in game.players: if _player.id == player: return player return { 'err': True, 'errorText': f'Player w/ uuid {player} not found in {game}' } def get_player_by_handle(self, game: str, player: str): game = self.get_game_by_id(game) if not game: return { 'err': True, 'errorText': f'Cannot lookup player for unknown game {game}' } for _player in game.players: if _player.get('handle') == player: return player return { 'err': True, 'errorText': f'Player {player} not found in {game}' } def get_games(self): try: _games: list = [] for game in self.games: print(f"Adding {game}") print(f"Players: {game.players}") _games.append({ 'id': game.id, 'rounds': game.rounds, 'resources': game.resources, 'players': game.players, 'created_at': game.created_at, 'state': game.state, 'started_at': game.started_at, 'state_changed_at': game.state_changed_at, }) return _games except: print(traceback.format_exc()) async def cah_handshake(self, websocket: WebSocket, data): """Handshake""" data = data.get('data') if not data: await websocket.send_json({ "err": "WTF", }) return await websocket.close() csid = str(data.get('csid')) resource = data.get('resource') platform = data.get('platform') if not csid in self.constants.VALID_CSIDS: await websocket.send_json({ "err": "Unauthorized", }) return await websocket.close() client = CAHClient( resource=resource, platform=platform, csid=csid, connected_at=int(time.time()), players=[], games=[], ) await self.connection_manager.handshake_complete(self, websocket, csid, client) await websocket.send_json({ "event": "handshake_response", "ts": int(time.time()), "data": { "success": True, "resource": resource, "platform": platform, "games": self.get_games(), }, }) async def create_game(self, websocket, data: str): data = data.get('data') if not self.connection_manager.get_connection_by_ws(websocket).get('client'): # No client set, valid handshake not completed return await websocket.send_json({ "event": "create_game_response", "err": True, "errorText": "Unauthorized", }) if not data.get('rounds') or not str(data.get('rounds')).isnumeric() or not data.get('creator_handle'): return await websocket.send_json({ "event": "create_game_response", "ts": int(time.time()), "data": { "success": False, "errorText": "Invalid data", "recvdData": data, } }) if len(self.games): await websocket.send_json({ 'event': 'create_game_response', 'ts': int(time.time()), 'data': { 'err': True, 'errorText': 'A game already exists' # One game limit } }) else: client = self.connection_manager.get_connection_by_ws(websocket).get('client') rounds = int(data.get('rounds')) game_uuid = str(uuid.uuid4()) creator_handle = data.get('creator_handle') creator = CAHPlayer(id=str(uuid.uuid4()), current_game=game_uuid, platform=client.platform, related_resource=client.resource, joined_at=int(time.time()), handle=creator_handle, ) game = CAHGame(id=game_uuid, rounds=rounds, resources=[client.resource,], players=[creator.__dict__,], created_at=int(time.time()), state=-1, started_at=0, state_changed_at=int(time.time())) await websocket.send_json({ "event": "create_game_response", "ts": int(time.time()), "data": { "success": True, "createdGame": game.__dict__, } }) await self.connection_manager.broadcast({ 'event': 'game_created', 'ts': int(time.time()), 'data': { 'game': game.__dict__, } }) client.games.append(game) self.games.append(game) await self.connection_manager.send_client_and_game_lists(self, websocket)