import logging import os import urllib.parse import regex import aiosqlite as sqlite3 from fastapi import FastAPI, HTTPException, Depends from fastapi_throttle import RateLimiter from fastapi.responses import JSONResponse from typing import LiteralString, Optional, Union, Iterable from regex import Pattern from .constructors import ValidTypeAheadRequest, ValidLyricRequest from lyric_search.constructors import LyricsResult from lyric_search.sources import aggregate from lyric_search import notifier class CacheUtils: """ Lyrics Cache DB Utils """ def __init__(self) -> None: self.lyrics_db_path: LiteralString = os.path.join( "/usr/local/share", "sqlite_dbs", "cached_lyrics.db" ) async def check_typeahead(self, query: str) -> Optional[list[str]]: """Lyric Search Typeahead DB Handler""" if not query: return None async with sqlite3.connect(self.lyrics_db_path, timeout=1) as _db: _db.row_factory = sqlite3.Row db_query: str = """SELECT DISTINCT(LOWER(TRIM(artist) || " - " || TRIM(song))),\ (TRIM(artist) || " - " || TRIM(song)) as ret FROM lyrics WHERE\ ret LIKE ? LIMIT 10""" db_params: tuple[str] = (f"%%%{query}%%%",) async with _db.execute(db_query, db_params) as _cursor: result: Iterable[sqlite3.Row] = await _cursor.fetchall() out_result = [str(r["ret"]) for r in result] return out_result class LyricSearch(FastAPI): """ Lyric Search Endpoint """ def __init__(self, app: FastAPI, util, constants) -> None: self.app: FastAPI = app self.util = util self.constants = constants self.cache_utils = CacheUtils() self.notifier = notifier.DiscordNotifier() self.endpoints: dict = { "typeahead/lyrics": self.typeahead_handler, "lyric_search": self.lyric_search_handler, # Preserving old endpoint path temporarily "lyric/search": self.lyric_search_handler, } self.acceptable_request_sources: list = [ "WEB", "WEB-RADIO", "DISC-HAVOC", "LIMNORIA-SHARED", "IRC-SHARED", ] self.lrc_regex: Pattern = regex.compile( r"\[([0-9]{2}:[0-9]{2})\.[0-9]{1,3}\](\s(.*)){0,}" ) for endpoint, handler in self.endpoints.items(): _schema_include = endpoint in ["lyric/search"] app.add_api_route( f"/{endpoint}", handler, methods=["POST"], include_in_schema=_schema_include, dependencies=[Depends(RateLimiter(times=2, seconds=3))] ) async def typeahead_handler(self, data: ValidTypeAheadRequest) -> JSONResponse: """ Lyric search typeahead handler - **query**: Typeahead query """ if not isinstance(data.query, str): return JSONResponse( status_code=500, content={ "err": True, "errorText": "Invalid request.", }, ) typeahead: Optional[list[str]] = await self.cache_utils.check_typeahead( data.query ) if not typeahead: return JSONResponse(content=[]) return JSONResponse(content=typeahead) async def lyric_search_handler(self, data: ValidLyricRequest) -> JSONResponse: """ Search for lyrics - **a**: artist - **s**: song - **t**: track (artist and song combined) [used only if a & s are not used] - **extra**: include extra details in response [optional, default: false] - **lrc**: Request LRCs? - **sub**: text to search within lyrics, if found lyrics will begin at found verse [optional, default: none] - **src**: the script/utility which initiated the request - **excluded_sources**: sources to exclude [optional, default: none] """ if (not data.a or not data.s) and not data.t or not data.src: raise HTTPException(detail="Invalid request", status_code=500) if data.src.upper() not in self.acceptable_request_sources: await self.notifier.send( f"ERROR @ {__file__.rsplit('/', maxsplit=1)[-1]}", f"Unknown request source: {data.src}", ) return JSONResponse( status_code=500, content={ "err": True, "errorText": f"Unknown request source: {data.src}", }, ) if data.a == "N/A" and data.s == "N/A": return JSONResponse( status_code=200, content={ "test": "success", }, ) if not data.t: search_artist: Optional[str] = data.a search_song: Optional[str] = data.s else: t_split: tuple = tuple(data.t.split(" - ", maxsplit=1)) (search_artist, search_song) = t_split if search_artist and search_song: search_artist = str( self.constants.DOUBLE_SPACE_REGEX.sub(" ", search_artist.strip()) ) search_song = str( self.constants.DOUBLE_SPACE_REGEX.sub(" ", search_song.strip()) ) search_artist = urllib.parse.unquote(search_artist) search_song = urllib.parse.unquote(search_song) if not isinstance(search_artist, str) or not isinstance(search_song, str): return JSONResponse( status_code=500, content={ "err": True, "errorText": "Invalid request", }, ) excluded_sources: Optional[list] = data.excluded_sources aggregate_search = aggregate.Aggregate(exclude_methods=excluded_sources) plain_lyrics: bool = not data.lrc result: Optional[Union[LyricsResult, dict]] = await aggregate_search.search( search_artist, search_song, plain_lyrics ) if not result: if not data.lrc: await self.notifier.send( "DEBUG", f"Could not locate lyrics, request was:\n`{data}`" ) return JSONResponse( content={ "err": True, "errorText": "Failed to locate lyrics on any available sources.", } ) result = vars(result) if data.sub and not data.lrc: seeked_found_line: Optional[int] = None lyric_lines: list[str] = result["lyrics"].strip().split(" / ") for i, line in enumerate(lyric_lines): line = regex.sub(r"\u2064", "", line.strip()) if data.sub.strip().lower() in line.strip().lower(): seeked_found_line = i logging.debug( "Found %s at %s, match for %s!", line, seeked_found_line, data.sub, ) # REMOVEME: DEBUG break if not seeked_found_line: return JSONResponse( status_code=500, content={ "err": True, "errorText": "Seek (a.k.a. subsearch) failed.", "failed_seek": True, }, ) result["lyrics"] = " / ".join(lyric_lines[seeked_found_line:]) result["confidence"] = int(result["confidence"]) result["time"] = f"{float(result['time']):.4f}" if plain_lyrics: result["lyrics"] = regex.sub( r"(\s/\s|\n)", "
", result["lyrics"] ).strip() else: # Swap lyrics key for 'lrc' result["lrc"] = result["lyrics"] result.pop("lyrics") if "cache" in result["src"]: result["from_cache"] = True if not data.extra: result.pop("src") return JSONResponse(content=result)