api/endpoints/transcriptions.py
2025-02-18 14:37:37 -05:00

128 lines
5.7 KiB
Python

import os
import aiosqlite as sqlite3
from fastapi import FastAPI
from fastapi.responses import JSONResponse
from typing import Optional, LiteralString, Union
from .constructors import ValidShowEpisodeLineRequest, ValidShowEpisodeListRequest
class Transcriptions(FastAPI):
"""
Transcription Endpoints
"""
def __init__(self, app: FastAPI, util, constants) -> None:
self.app: FastAPI = app
self.util = util
self.constants = constants
self.endpoints: dict = {
"transcriptions/get_episodes": self.get_episodes_handler,
"transcriptions/get_episode_lines": self.get_episode_lines_handler,
#tbd
}
for endpoint, handler in self.endpoints.items():
app.add_api_route(f"/{endpoint}", handler, methods=["POST"],
include_in_schema=True)
async def get_episodes_handler(self, data: ValidShowEpisodeListRequest) -> JSONResponse:
"""
Get list of episodes by show id
- **s**: Show ID to query
"""
show_id: int = data.s
db_path: Optional[Union[str, LiteralString]] = None
db_query: Optional[str] = None
show_title: Optional[str] = None
if not show_id:
return JSONResponse(status_code=500, content={
'err': True,
'errorText': 'Invalid request',
})
show_id = int(show_id)
if not(str(show_id).isnumeric()) or show_id not in [0, 1, 2]:
return JSONResponse(status_code=500, content={
'err': True,
'errorText': 'Show not found.',
})
match show_id:
case 0:
db_path = os.path.join("/usr/local/share",
"sqlite_dbs", "sp.db")
db_query = """SELECT DISTINCT(("S" || Season || "E" || Episode || " " || Title)), ID FROM SP_DAT ORDER BY Season, Episode"""
show_title = "South Park"
case 1:
db_path = os.path.join("/usr/local/share",
"sqlite_dbs", "futur.db")
db_query = """SELECT DISTINCT(("S" || EP_S || "E" || EP_EP || " " || EP_TITLE)), EP_ID FROM clean_dialog ORDER BY EP_S, EP_EP"""
show_title = "Futurama"
case 2:
db_path = os.path.join("/usr/local/share",
"sqlite_dbs", "parks.db")
db_query = """SELECT DISTINCT(("S" || EP_S || "E" || EP_EP || " " || EP_TITLE)), EP_ID FROM clean_dialog ORDER BY EP_S, EP_EP"""
show_title = "Parks And Rec"
case _:
return JSONResponse(status_code=500, content={
'err': True,
'errorText': 'Unknown error.',
})
async with sqlite3.connect(database=db_path, timeout=1) as _db:
async with await _db.execute(db_query) as _cursor:
result: list[tuple] = await _cursor.fetchall()
return JSONResponse(content={
"show_title": show_title,
"episodes": [
{
'id': item[1],
'ep_friendly': item[0],
} for item in result],
})
async def get_episode_lines_handler(self, data: ValidShowEpisodeLineRequest) -> JSONResponse:
"""
Get lines for a particular episode
- **s**: Show ID to query
- **e**: Episode ID to query
"""
show_id: int = int(data.s)
episode_id: int = int(data.e)
match show_id:
case 0:
db_path: Union[str, LiteralString] = os.path.join("/usr/local/share",
"sqlite_dbs", "sp.db")
db_query: str = """SELECT ("S" || Season || "E" || Episode || " " || Title), Character, Line FROM SP_DAT WHERE ID = ?"""
case 1:
db_path = os.path.join("/usr/local/share",
"sqlite_dbs", "futur.db")
db_query = """SELECT ("S" || EP_S || "E" || EP_EP || " " || EP_TITLE || "<br><em>Opener: " || EP_OPENER || "</em>"), EP_LINE_SPEAKER, EP_LINE FROM clean_dialog WHERE EP_ID = ? ORDER BY LINE_ID ASC"""
case 2:
db_path = os.path.join("/usr/local/share",
"sqlite_dbs", "parks.db")
db_query = """SELECT ("S" || EP_S || "E" || EP_EP || " " || EP_TITLE), EP_LINE_SPEAKER, EP_LINE FROM clean_dialog WHERE EP_ID = ? ORDER BY id ASC"""
case _:
return JSONResponse(status_code=500, content={
'err': True,
'errorText': 'Unknown error',
})
async with sqlite3.connect(database=db_path,
timeout=1) as _db:
params: tuple = (episode_id,)
async with await _db.execute(db_query, params) as _cursor:
result: list[tuple] = await _cursor.fetchall()
first_result: tuple = result[0]
return JSONResponse(content={
'episode_id': episode_id,
'ep_friendly': first_result[0].strip(),
'lines': [
{
'speaker': item[1].strip(),
'line': item[2].strip(),
} for item in result],
})