#!/usr/bin/env python3.12 import os import aiosqlite as sqlite3 from fastapi import FastAPI from typing import Optional, LiteralString from .constructors import ValidShowEpisodeLineRequest, ValidShowEpisodeListRequest class Transcriptions(FastAPI): """Transcription Endpoints""" def __init__(self, app: FastAPI, util, constants) -> None: # pylint: disable=super-init-not-called self.app = app self.util = util self.constants = constants self.endpoints: dict = { "transcriptions/get_episodes": self.get_episodes_handler, "transcriptions/get_episode_lines": self.get_episode_lines_handler, #tbd } for endpoint, handler in self.endpoints.items(): app.add_api_route(f"/{endpoint}", handler, methods=["POST"], include_in_schema=False) async def get_episodes_handler(self, data: ValidShowEpisodeListRequest) -> dict: """Get list of episodes by show id""" show_id: int = data.s db_path: Optional[str|LiteralString] = None db_query: Optional[str] = None show_title: Optional[str] = None if show_id is None: return { 'err': True, 'errorText': 'Invalid request', } show_id = int(show_id) if not(str(show_id).isnumeric()) or show_id not in [0, 1, 2]: return { 'err': True, 'errorText': 'Show not found.', } match show_id: case 0: db_path = os.path.join("/usr/local/share", "sqlite_dbs", "sp.db") db_query = """SELECT DISTINCT(("S" || Season || "E" || Episode || " " || Title)), ID FROM SP_DAT ORDER BY Season, Episode""" show_title = "South Park" case 1: db_path = os.path.join("/usr/local/share", "sqlite_dbs", "futur.db") db_query = """SELECT DISTINCT(("S" || EP_S || "E" || EP_EP || " " || EP_TITLE)), EP_ID FROM clean_dialog ORDER BY EP_S, EP_EP""" show_title = "Futurama" case 2: db_path = os.path.join("/usr/local/share", "sqlite_dbs", "parks.db") db_query = """SELECT DISTINCT(("S" || EP_S || "E" || EP_EP || " " || EP_TITLE)), EP_ID FROM clean_dialog ORDER BY EP_S, EP_EP""" show_title = "Parks And Rec" case _: return { 'err': True, 'errorText': 'Unknown error.' } async with sqlite3.connect(database=db_path, timeout=1) as _db: async with await _db.execute(db_query) as _cursor: result: list[tuple] = await _cursor.fetchall() return { "show_title": show_title, "episodes": [ { 'id': item[1], 'ep_friendly': item[0] } for item in result] } async def get_episode_lines_handler(self, data: ValidShowEpisodeLineRequest) -> dict: """Get lines for a particular episode""" show_id: int = data.s episode_id: int = data.e # pylint: disable=line-too-long match show_id: case 0: db_path = os.path.join("/usr/local/share", "sqlite_dbs", "sp.db") db_query = """SELECT ("S" || Season || "E" || Episode || " " || Title), Character, Line FROM SP_DAT WHERE ID = ?""" case 1: db_path = os.path.join("/usr/local/share", "sqlite_dbs", "futur.db") db_query = """SELECT ("S" || EP_S || "E" || EP_EP || " " || EP_TITLE || "
Opener: " || EP_OPENER || ""), EP_LINE_SPEAKER, EP_LINE FROM clean_dialog WHERE EP_ID = ? ORDER BY LINE_ID ASC""" case 2: db_path = os.path.join("/usr/local/share", "sqlite_dbs", "parks.db") db_query = """SELECT ("S" || EP_S || "E" || EP_EP || " " || EP_TITLE), EP_LINE_SPEAKER, EP_LINE FROM clean_dialog WHERE EP_ID = ? ORDER BY id ASC""" case _: return { 'err': True, 'errorText': 'Unknown error' } async with sqlite3.connect(database=db_path, timeout=1) as _db: params: tuple = (episode_id,) async with await _db.execute(db_query, params) as _cursor: result: list[tuple] = await _cursor.fetchall() first_result: tuple = result[0] return { 'episode_id': episode_id, 'ep_friendly': first_result[0].strip(), 'lines': [ { 'speaker': item[1].strip(), 'line': item[2].strip() } for item in result] }