#!/usr/bin/env python3.12 import os import aiosqlite as sqlite3 from fastapi import FastAPI from fastapi.responses import JSONResponse from typing import Optional, LiteralString, Union from .constructors import ValidShowEpisodeLineRequest, ValidShowEpisodeListRequest class Transcriptions(FastAPI): """ Transcription Endpoints """ def __init__(self, app: FastAPI, util, constants) -> None: self.app: FastAPI = app self.util = util self.constants = constants self.endpoints: dict = { "transcriptions/get_episodes": self.get_episodes_handler, "transcriptions/get_episode_lines": self.get_episode_lines_handler, #tbd } for endpoint, handler in self.endpoints.items(): app.add_api_route(f"/{endpoint}", handler, methods=["POST"], include_in_schema=True) async def get_episodes_handler(self, data: ValidShowEpisodeListRequest) -> JSONResponse: """ Get list of episodes by show id - **s**: Show ID to query """ show_id: int = data.s db_path: Optional[Union[str, LiteralString]] = None db_query: Optional[str] = None show_title: Optional[str] = None if not show_id: return JSONResponse(status_code=500, content={ 'err': True, 'errorText': 'Invalid request', }) show_id = int(show_id) if not(str(show_id).isnumeric()) or show_id not in [0, 1, 2]: return JSONResponse(status_code=500, content={ 'err': True, 'errorText': 'Show not found.', }) match show_id: case 0: db_path = os.path.join("/usr/local/share", "sqlite_dbs", "sp.db") db_query = """SELECT DISTINCT(("S" || Season || "E" || Episode || " " || Title)), ID FROM SP_DAT ORDER BY Season, Episode""" show_title = "South Park" case 1: db_path = os.path.join("/usr/local/share", "sqlite_dbs", "futur.db") db_query = """SELECT DISTINCT(("S" || EP_S || "E" || EP_EP || " " || EP_TITLE)), EP_ID FROM clean_dialog ORDER BY EP_S, EP_EP""" show_title = "Futurama" case 2: db_path = os.path.join("/usr/local/share", "sqlite_dbs", "parks.db") db_query = """SELECT DISTINCT(("S" || EP_S || "E" || EP_EP || " " || EP_TITLE)), EP_ID FROM clean_dialog ORDER BY EP_S, EP_EP""" show_title = "Parks And Rec" case _: return JSONResponse(status_code=500, content={ 'err': True, 'errorText': 'Unknown error.', }) async with sqlite3.connect(database=db_path, timeout=1) as _db: async with await _db.execute(db_query) as _cursor: result: list[tuple] = await _cursor.fetchall() return JSONResponse(content={ "show_title": show_title, "episodes": [ { 'id': item[1], 'ep_friendly': item[0], } for item in result], }) async def get_episode_lines_handler(self, data: ValidShowEpisodeLineRequest) -> JSONResponse: """ Get lines for a particular episode - **s**: Show ID to query - **e**: Episode ID to query """ show_id: int = int(data.s) episode_id: int = int(data.e) match show_id: case 0: db_path: Union[str, LiteralString] = os.path.join("/usr/local/share", "sqlite_dbs", "sp.db") db_query: str = """SELECT ("S" || Season || "E" || Episode || " " || Title), Character, Line FROM SP_DAT WHERE ID = ?""" case 1: db_path = os.path.join("/usr/local/share", "sqlite_dbs", "futur.db") db_query = """SELECT ("S" || EP_S || "E" || EP_EP || " " || EP_TITLE || "
Opener: " || EP_OPENER || ""), EP_LINE_SPEAKER, EP_LINE FROM clean_dialog WHERE EP_ID = ? ORDER BY LINE_ID ASC""" case 2: db_path = os.path.join("/usr/local/share", "sqlite_dbs", "parks.db") db_query = """SELECT ("S" || EP_S || "E" || EP_EP || " " || EP_TITLE), EP_LINE_SPEAKER, EP_LINE FROM clean_dialog WHERE EP_ID = ? ORDER BY id ASC""" case _: return JSONResponse(status_code=500, content={ 'err': True, 'errorText': 'Unknown error', }) async with sqlite3.connect(database=db_path, timeout=1) as _db: params: tuple = (episode_id,) async with await _db.execute(db_query, params) as _cursor: result: list[tuple] = await _cursor.fetchall() first_result: tuple = result[0] return JSONResponse(content={ 'episode_id': episode_id, 'ep_friendly': first_result[0].strip(), 'lines': [ { 'speaker': item[1].strip(), 'line': item[2].strip(), } for item in result], })