import os
import aiosqlite as sqlite3
from fastapi import FastAPI
from fastapi.responses import JSONResponse
from typing import Optional, LiteralString, Union
from .constructors import ValidShowEpisodeLineRequest, ValidShowEpisodeListRequest
class Transcriptions(FastAPI):
"""
Transcription Endpoints
"""
def __init__(self, app: FastAPI, util, constants) -> None:
self.app: FastAPI = app
self.util = util
self.constants = constants
self.endpoints: dict = {
"transcriptions/get_episodes": self.get_episodes_handler,
"transcriptions/get_episode_lines": self.get_episode_lines_handler,
# tbd
}
for endpoint, handler in self.endpoints.items():
app.add_api_route(
f"/{endpoint}", handler, methods=["POST"], include_in_schema=True
)
async def get_episodes_handler(
self, data: ValidShowEpisodeListRequest
) -> JSONResponse:
"""
Get list of episodes by show id
- **s**: Show ID to query
"""
show_id: int = data.s
db_path: Optional[Union[str, LiteralString]] = None
db_query: Optional[str] = None
show_title: Optional[str] = None
if not isinstance(show_id, int):
return JSONResponse(
status_code=500,
content={
"err": True,
"errorText": "Invalid request",
},
)
show_id = int(show_id)
if not (str(show_id).isnumeric()) or show_id not in [0, 1, 2]:
return JSONResponse(
status_code=500,
content={
"err": True,
"errorText": "Show not found.",
},
)
match show_id:
case 0:
db_path = os.path.join("/usr/local/share", "sqlite_dbs", "sp.db")
db_query = """SELECT DISTINCT(("S" || Season || "E" || Episode || " " || Title)), ID FROM SP_DAT ORDER BY Season, Episode"""
show_title = "South Park"
case 1:
db_path = os.path.join("/usr/local/share", "sqlite_dbs", "futur.db")
db_query = """SELECT DISTINCT(("S" || EP_S || "E" || EP_EP || " " || EP_TITLE)), EP_ID FROM clean_dialog ORDER BY EP_S, EP_EP"""
show_title = "Futurama"
case 2:
db_path = os.path.join("/usr/local/share", "sqlite_dbs", "parks.db")
db_query = """SELECT DISTINCT(("S" || EP_S || "E" || EP_EP || " " || EP_TITLE)), EP_ID FROM clean_dialog ORDER BY EP_S, EP_EP"""
show_title = "Parks And Rec"
case _:
return JSONResponse(
status_code=500,
content={
"err": True,
"errorText": "Unknown error.",
},
)
async with sqlite3.connect(database=db_path, timeout=1) as _db:
async with await _db.execute(db_query) as _cursor:
result: list[tuple] = await _cursor.fetchall()
return JSONResponse(
content={
"show_title": show_title,
"episodes": [
{
"id": item[1],
"ep_friendly": item[0],
}
for item in result
],
}
)
async def get_episode_lines_handler(
self, data: ValidShowEpisodeLineRequest
) -> JSONResponse:
"""
Get lines for a particular episode
- **s**: Show ID to query
- **e**: Episode ID to query
"""
show_id: int = int(data.s)
episode_id: int = int(data.e)
match show_id:
case 0:
db_path: Union[str, LiteralString] = os.path.join(
"/usr/local/share", "sqlite_dbs", "sp.db"
)
db_query: str = (
"""SELECT ("S" || Season || "E" || Episode || " " || Title), Character, Line FROM SP_DAT WHERE ID = ?"""
)
case 1:
db_path = os.path.join("/usr/local/share", "sqlite_dbs", "futur.db")
db_query = """SELECT ("S" || EP_S || "E" || EP_EP || " " || EP_TITLE || "
Opener: " || EP_OPENER || ""), EP_LINE_SPEAKER, EP_LINE FROM clean_dialog WHERE EP_ID = ? ORDER BY LINE_ID ASC"""
case 2:
db_path = os.path.join("/usr/local/share", "sqlite_dbs", "parks.db")
db_query = """SELECT ("S" || EP_S || "E" || EP_EP || " " || EP_TITLE), EP_LINE_SPEAKER, EP_LINE FROM clean_dialog WHERE EP_ID = ? ORDER BY id ASC"""
case _:
return JSONResponse(
status_code=500,
content={
"err": True,
"errorText": "Unknown error",
},
)
async with sqlite3.connect(database=db_path, timeout=1) as _db:
params: tuple = (episode_id,)
async with await _db.execute(db_query, params) as _cursor:
result: list[tuple] = await _cursor.fetchall()
first_result: tuple = result[0]
return JSONResponse(
content={
"episode_id": episode_id,
"ep_friendly": first_result[0].strip(),
"lines": [
{
"speaker": item[1].strip(),
"line": item[2].strip(),
}
for item in result
],
}
)