api/endpoints/transcriptions.py
2024-12-02 11:04:24 -05:00

135 lines
5.1 KiB
Python

#!/usr/bin/env python3.12
import os
import aiosqlite as sqlite3
from fastapi import FastAPI
from pydantic import BaseModel
class ValidShowEpisodeListRequest(BaseModel):
"""
- **s**: show id
"""
s: int
class ValidShowEpisodeLineRequest(BaseModel):
"""
- **s**: show id
- **e**: episode id
"""
s: int
e: int
class Transcriptions(FastAPI):
"""Transcription Endpoints"""
def __init__(self, app: FastAPI, util, constants, glob_state): # pylint: disable=super-init-not-called
self.app = app
self.util = util
self.constants = constants
self.glob_state = glob_state
self.endpoints = {
"transcriptions/get_episodes": self.get_episodes_handler,
"transcriptions/get_episode_lines": self.get_episode_lines_handler,
#tbd
}
for endpoint, handler in self.endpoints.items():
app.add_api_route(f"/{endpoint}/", handler, methods=["POST"])
async def get_episodes_handler(self, data: ValidShowEpisodeListRequest):
"""
/transcriptions/get_episodes/
Get list of episodes by show id
"""
show_id = data.s
db_path = None
db_query = None
show_title = None
if show_id is None:
return {
'err': True,
'errorText': 'Invalid request'
}
show_id = int(show_id)
if not(str(show_id).isnumeric()) or not(show_id in [0, 1, 2]):
return {
'err': True,
'errorText': 'Show not found.'
}
match show_id:
case 0:
db_path = os.path.join("/", "var", "lib", "singerdbs", "sp.db")
db_query = """SELECT DISTINCT(("S" || Season || "E" || Episode || " " || Title)), ID FROM SP_DAT ORDER BY Season, Episode"""
show_title = "South Park"
case 1:
db_path = os.path.join("/", "var", "lib", "singerdbs", "futur.db")
db_query = """SELECT DISTINCT(("S" || EP_S || "E" || EP_EP || " " || EP_TITLE)), EP_ID FROM clean_dialog ORDER BY EP_S, EP_EP"""
show_title = "Futurama"
case 2:
db_path = os.path.join("/", "var", "lib", "singerdbs", "parks.db")
db_query = """SELECT DISTINCT(("S" || EP_S || "E" || EP_EP || " " || EP_TITLE)), EP_ID FROM clean_dialog ORDER BY EP_S, EP_EP"""
show_title = "Parks And Rec"
case _:
return {
'err': True,
'errorText': 'Unknown error.'
}
await self.glob_state.increment_counter('transcript_list_requests')
async with sqlite3.connect(database=db_path, timeout=1) as _db:
async with _db.execute(db_query) as _cursor:
result = await _cursor.fetchall()
return {
"show_title": show_title,
"episodes": [
{
'id': item[1],
'ep_friendly': item[0]
} for item in result]
}
async def get_episode_lines_handler(self, data: ValidShowEpisodeLineRequest):
"""/transcriptions/get_episode_lines/
Get lines for a particular episode
"""
show_id = data.s
episode_id = data.e
match show_id:
case 0:
db_path = os.path.join("/", "var", "lib", "singerdbs", "sp.db")
db_query = """SELECT ("S" || Season || "E" || Episode || " " || Title), Character, Line FROM SP_DAT WHERE ID = ?"""
case 1:
db_path = os.path.join("/", "var", "lib", "singerdbs", "futur.db")
db_query = """SELECT ("S" || EP_S || "E" || EP_EP || " " || EP_TITLE || "<br><em>Opener: " || EP_OPENER || "</em>"), EP_LINE_SPEAKER, EP_LINE FROM clean_dialog WHERE EP_ID = ? ORDER BY LINE_ID ASC"""
case 2:
db_path = os.path.join("/", "var", "lib", "singerdbs", "parks.db")
db_query = """SELECT ("S" || EP_S || "E" || EP_EP || " " || EP_TITLE), EP_LINE_SPEAKER, EP_LINE FROM clean_dialog WHERE EP_ID = ? ORDER BY id ASC"""
case _:
return {
'err': True,
'errorText': 'Unknown error'
}
await self.glob_state.increment_counter('transcript_requests')
async with sqlite3.connect(database=db_path, timeout=1) as _db:
params = (episode_id,)
async with _db.execute(db_query, params) as _cursor:
result = await _cursor.fetchall()
first_result = result[0]
return {
'episode_id': episode_id,
'ep_friendly': first_result[0].strip(),
'lines': [
{
'speaker': item[1].strip(),
'line': item[2].strip()
} for item in result]
}