#!/usr/bin/env python3.12 import os import aiosqlite as sqlite3 from fastapi import FastAPI from pydantic import BaseModel class ValidShowEpisodeListRequest(BaseModel): """ - **s**: show id """ s: int class ValidShowEpisodeLineRequest(BaseModel): """ - **s**: show id - **e**: episode id """ s: int e: int class Transcriptions(FastAPI): """Transcription Endpoints""" def __init__(self, app: FastAPI, util, constants, glob_state): # pylint: disable=super-init-not-called self.app = app self.util = util self.constants = constants self.glob_state = glob_state self.endpoints = { "transcriptions/get_episodes": self.get_episodes_handler, "transcriptions/get_episode_lines": self.get_episode_lines_handler, #tbd } for endpoint, handler in self.endpoints.items(): app.add_api_route(f"/{endpoint}/", handler, methods=["POST"]) async def get_episodes_handler(self, data: ValidShowEpisodeListRequest): """ /transcriptions/get_episodes/ Get list of episodes by show id """ show_id = data.s db_path = None db_query = None show_title = None if show_id is None: return { 'err': True, 'errorText': 'Invalid request' } show_id = int(show_id) if not(str(show_id).isnumeric()) or show_id not in [0, 1, 2]: return { 'err': True, 'errorText': 'Show not found.' } match show_id: case 0: db_path = os.path.join("/", "var", "lib", "singerdbs", "sp.db") db_query = """SELECT DISTINCT(("S" || Season || "E" || Episode || " " || Title)), ID FROM SP_DAT ORDER BY Season, Episode""" show_title = "South Park" case 1: db_path = os.path.join("/", "var", "lib", "singerdbs", "futur.db") db_query = """SELECT DISTINCT(("S" || EP_S || "E" || EP_EP || " " || EP_TITLE)), EP_ID FROM clean_dialog ORDER BY EP_S, EP_EP""" show_title = "Futurama" case 2: db_path = os.path.join("/", "var", "lib", "singerdbs", "parks.db") db_query = """SELECT DISTINCT(("S" || EP_S || "E" || EP_EP || " " || EP_TITLE)), EP_ID FROM clean_dialog ORDER BY EP_S, EP_EP""" show_title = "Parks And Rec" case _: return { 'err': True, 'errorText': 'Unknown error.' } await self.glob_state.increment_counter('transcript_list_requests') async with sqlite3.connect(database=db_path, timeout=1) as _db: async with _db.execute(db_query) as _cursor: result = await _cursor.fetchall() return { "show_title": show_title, "episodes": [ { 'id': item[1], 'ep_friendly': item[0] } for item in result] } async def get_episode_lines_handler(self, data: ValidShowEpisodeLineRequest): """/transcriptions/get_episode_lines/ Get lines for a particular episode """ show_id = data.s episode_id = data.e # pylint: disable=line-too-long match show_id: case 0: db_path = os.path.join("/", "var", "lib", "singerdbs", "sp.db") db_query = """SELECT ("S" || Season || "E" || Episode || " " || Title), Character, Line FROM SP_DAT WHERE ID = ?""" case 1: db_path = os.path.join("/", "var", "lib", "singerdbs", "futur.db") db_query = """SELECT ("S" || EP_S || "E" || EP_EP || " " || EP_TITLE || "
Opener: " || EP_OPENER || ""), EP_LINE_SPEAKER, EP_LINE FROM clean_dialog WHERE EP_ID = ? ORDER BY LINE_ID ASC""" case 2: db_path = os.path.join("/", "var", "lib", "singerdbs", "parks.db") db_query = """SELECT ("S" || EP_S || "E" || EP_EP || " " || EP_TITLE), EP_LINE_SPEAKER, EP_LINE FROM clean_dialog WHERE EP_ID = ? ORDER BY id ASC""" case _: return { 'err': True, 'errorText': 'Unknown error' } await self.glob_state.increment_counter('transcript_requests') async with sqlite3.connect(database=db_path, timeout=1) as _db: params = (episode_id,) async with _db.execute(db_query, params) as _cursor: result = await _cursor.fetchall() first_result = result[0] return { 'episode_id': episode_id, 'ep_friendly': first_result[0].strip(), 'lines': [ { 'speaker': item[1].strip(), 'line': item[2].strip() } for item in result] }