diff --git a/base.py b/base.py
index c86a41b..14e7dbd 100644
--- a/base.py
+++ b/base.py
@@ -103,9 +103,6 @@ routes: dict = {
"randmsg": importlib.import_module("endpoints.rand_msg").RandMsg(
app, util, constants
),
- "transcriptions": importlib.import_module(
- "endpoints.transcriptions"
- ).Transcriptions(app, util, constants),
"lyrics": importlib.import_module("endpoints.lyric_search").LyricSearch(
app, util, constants
),
diff --git a/endpoints/transcriptions.py b/endpoints/transcriptions.py
deleted file mode 100644
index 37d7874..0000000
--- a/endpoints/transcriptions.py
+++ /dev/null
@@ -1,171 +0,0 @@
-import os
-import aiosqlite as sqlite3
-from fastapi import FastAPI, Depends, Response
-from fastapi_throttle import RateLimiter
-from fastapi.responses import JSONResponse
-from typing import Optional, LiteralString, Union, Iterable, cast
-from .constructors import ValidShowEpisodeLineRequest, ValidShowEpisodeListRequest
-
-
-class Transcriptions(FastAPI):
- """
- Transcription Endpoints
- """
-
- def __init__(self, app: FastAPI, util, constants) -> None:
- """Initialize Transcriptions endpoints."""
- self.app: FastAPI = app
- self.util = util
- self.constants = constants
-
- self.endpoints: dict = {
- "transcriptions/get_episodes": self.get_episodes_handler,
- "transcriptions/get_episode_lines": self.get_episode_lines_handler,
- # tbd
- }
-
- for endpoint, handler in self.endpoints.items():
- app.add_api_route(
- f"/{endpoint}",
- handler,
- methods=["POST"],
- include_in_schema=True,
- dependencies=[Depends(RateLimiter(times=2, seconds=2))],
- )
-
- async def get_episodes_handler(
- self, data: ValidShowEpisodeListRequest
- ) -> JSONResponse:
- """
- Get list of episodes by show ID.
-
- Parameters:
- - **data** (ValidShowEpisodeListRequest): Request containing show ID.
-
- Returns:
- - **JSONResponse**: Contains a list of episodes.
- """
- show_id: int = data.s
- db_path: Optional[Union[str, LiteralString]] = None
- db_query: Optional[str] = None
- show_title: Optional[str] = None
-
- if not isinstance(show_id, int):
- return JSONResponse(
- status_code=500,
- content={
- "err": True,
- "errorText": "Invalid request",
- },
- )
-
- show_id = int(show_id)
-
- if not (str(show_id).isnumeric()) or show_id not in [0, 1, 2]:
- return JSONResponse(
- status_code=500,
- content={
- "err": True,
- "errorText": "Show not found.",
- },
- )
-
- match show_id:
- case 0:
- db_path = os.path.join("/usr/local/share", "sqlite_dbs", "sp.db")
- db_query = """SELECT DISTINCT(("S" || Season || "E" || Episode || " " || Title)), ID FROM SP_DAT ORDER BY Season, Episode"""
- show_title = "South Park"
- case 1:
- db_path = os.path.join("/usr/local/share", "sqlite_dbs", "futur.db")
- db_query = """SELECT DISTINCT(("S" || EP_S || "E" || EP_EP || " " || EP_TITLE)), EP_ID FROM clean_dialog ORDER BY EP_S, EP_EP"""
- show_title = "Futurama"
- case 2:
- db_path = os.path.join("/usr/local/share", "sqlite_dbs", "parks.db")
- db_query = """SELECT DISTINCT(("S" || EP_S || "E" || EP_EP || " " || EP_TITLE)), EP_ID FROM clean_dialog ORDER BY EP_S, EP_EP"""
- show_title = "Parks And Rec"
- case _:
- return JSONResponse(
- status_code=500,
- content={
- "err": True,
- "errorText": "Unknown error.",
- },
- )
-
- async with sqlite3.connect(database=db_path, timeout=1) as _db:
- async with await _db.execute(db_query) as _cursor:
- result: Iterable[sqlite3.Row] = await _cursor.fetchall()
- return JSONResponse(
- content={
- "show_title": show_title,
- "episodes": [
- {
- "id": item[1],
- "ep_friendly": item[0],
- }
- for item in result
- ],
- }
- )
-
- async def get_episode_lines_handler(
- self, data: ValidShowEpisodeLineRequest
- ) -> Response:
- """
- Get lines for a particular episode.
-
- Parameters:
- - **data** (ValidShowEpisodeLineRequest): Request containing show and episode ID.
-
- Returns:
- - **Response**: Episode lines.
- """
- show_id: int = int(data.s)
- episode_id: int = int(data.e)
-
- match show_id:
- case 0:
- db_path: Union[str, LiteralString] = os.path.join(
- "/usr/local/share", "sqlite_dbs", "sp.db"
- )
- db_query: str = """SELECT ("S" || Season || "E" || Episode || " " || Title), Character, Line FROM SP_DAT WHERE ID = ?"""
- case 1:
- db_path = os.path.join("/usr/local/share", "sqlite_dbs", "futur.db")
- db_query = """SELECT ("S" || EP_S || "E" || EP_EP || " " || EP_TITLE || "
Opener: " || EP_OPENER || ""), EP_LINE_SPEAKER, EP_LINE FROM clean_dialog WHERE EP_ID = ? ORDER BY LINE_ID ASC"""
- case 2:
- db_path = os.path.join("/usr/local/share", "sqlite_dbs", "parks.db")
- db_query = """SELECT ("S" || EP_S || "E" || EP_EP || " " || EP_TITLE), EP_LINE_SPEAKER, EP_LINE FROM clean_dialog WHERE EP_ID = ? ORDER BY id ASC"""
-
- case _:
- return JSONResponse(
- status_code=500,
- content={
- "err": True,
- "errorText": "Unknown error",
- },
- )
-
- async with sqlite3.connect(database=db_path, timeout=1) as _db:
- params: tuple = (episode_id,)
- async with await _db.execute(db_query, params) as _cursor:
- result: Iterable[sqlite3.Row] = await _cursor.fetchall()
- result_list = cast(list[sqlite3.Row], result)
- if not result_list:
- return Response(
- status_code=404,
- content="Not found",
- )
- first_result: sqlite3.Row = result_list[0]
- return JSONResponse(
- content={
- "episode_id": episode_id,
- "ep_friendly": first_result[0].strip(),
- "lines": [
- {
- "speaker": item[1].strip(),
- "line": item[2].strip(),
- }
- for item in result
- ],
- }
- )