#!/usr/bin/env python3.12
import os
import aiosqlite as sqlite3
from fastapi import FastAPI
from fastapi.responses import JSONResponse
from typing import Optional, LiteralString, Union
from .constructors import ValidShowEpisodeLineRequest, ValidShowEpisodeListRequest
class Transcriptions(FastAPI):
"""
Transcription Endpoints
"""
def __init__(self, app: FastAPI, util, constants) -> None: # pylint: disable=super-init-not-called
self.app: FastAPI = app
self.util = util
self.constants = constants
self.endpoints: dict = {
"transcriptions/get_episodes": self.get_episodes_handler,
"transcriptions/get_episode_lines": self.get_episode_lines_handler,
#tbd
}
for endpoint, handler in self.endpoints.items():
app.add_api_route(f"/{endpoint}", handler, methods=["POST"],
include_in_schema=True)
async def get_episodes_handler(self, data: ValidShowEpisodeListRequest) -> JSONResponse:
"""
Get list of episodes by show id
"""
show_id: int = data.s
db_path: Optional[Union[str, LiteralString]] = None
db_query: Optional[str] = None
show_title: Optional[str] = None
if not show_id:
return JSONResponse(status_code=500, content={
'err': True,
'errorText': 'Invalid request',
})
show_id = int(show_id)
if not(str(show_id).isnumeric()) or show_id not in [0, 1, 2]:
return JSONResponse(status_code=500, content={
'err': True,
'errorText': 'Show not found.',
})
match show_id:
case 0:
db_path = os.path.join("/usr/local/share",
"sqlite_dbs", "sp.db")
db_query = """SELECT DISTINCT(("S" || Season || "E" || Episode || " " || Title)), ID FROM SP_DAT ORDER BY Season, Episode"""
show_title = "South Park"
case 1:
db_path = os.path.join("/usr/local/share",
"sqlite_dbs", "futur.db")
db_query = """SELECT DISTINCT(("S" || EP_S || "E" || EP_EP || " " || EP_TITLE)), EP_ID FROM clean_dialog ORDER BY EP_S, EP_EP"""
show_title = "Futurama"
case 2:
db_path = os.path.join("/usr/local/share",
"sqlite_dbs", "parks.db")
db_query = """SELECT DISTINCT(("S" || EP_S || "E" || EP_EP || " " || EP_TITLE)), EP_ID FROM clean_dialog ORDER BY EP_S, EP_EP"""
show_title = "Parks And Rec"
case _:
return JSONResponse(status_code=500, content={
'err': True,
'errorText': 'Unknown error.',
})
async with sqlite3.connect(database=db_path, timeout=1) as _db:
async with await _db.execute(db_query) as _cursor:
result: list[tuple] = await _cursor.fetchall()
return JSONResponse(content={
"show_title": show_title,
"episodes": [
{
'id': item[1],
'ep_friendly': item[0],
} for item in result],
})
async def get_episode_lines_handler(self, data: ValidShowEpisodeLineRequest) -> JSONResponse:
"""
Get lines for a particular episode
"""
show_id: int = int(data.s)
episode_id: int = int(data.e)
# pylint: disable=line-too-long
match show_id:
case 0:
db_path: Union[str, LiteralString] = os.path.join("/usr/local/share",
"sqlite_dbs", "sp.db")
db_query: str = """SELECT ("S" || Season || "E" || Episode || " " || Title), Character, Line FROM SP_DAT WHERE ID = ?"""
case 1:
db_path = os.path.join("/usr/local/share",
"sqlite_dbs", "futur.db")
db_query = """SELECT ("S" || EP_S || "E" || EP_EP || " " || EP_TITLE || "
Opener: " || EP_OPENER || ""), EP_LINE_SPEAKER, EP_LINE FROM clean_dialog WHERE EP_ID = ? ORDER BY LINE_ID ASC"""
case 2:
db_path = os.path.join("/usr/local/share",
"sqlite_dbs", "parks.db")
db_query = """SELECT ("S" || EP_S || "E" || EP_EP || " " || EP_TITLE), EP_LINE_SPEAKER, EP_LINE FROM clean_dialog WHERE EP_ID = ? ORDER BY id ASC"""
case _:
return JSONResponse(status_code=500, content={
'err': True,
'errorText': 'Unknown error',
})
async with sqlite3.connect(database=db_path,
timeout=1) as _db:
params: tuple = (episode_id,)
async with await _db.execute(db_query, params) as _cursor:
result: list[tuple] = await _cursor.fetchall()
first_result: tuple = result[0]
return JSONResponse(content={
'episode_id': episode_id,
'ep_friendly': first_result[0].strip(),
'lines': [
{
'speaker': item[1].strip(),
'line': item[2].strip(),
} for item in result],
})