import logging import os import urllib.parse import regex import aiosqlite as sqlite3 from fastapi import FastAPI, HTTPException from fastapi.responses import JSONResponse from typing import LiteralString, Optional, Union, Iterable from regex import Pattern from .constructors import ValidTypeAheadRequest, ValidLyricRequest from lyric_search.constructors import LyricsResult from lyric_search.sources import aggregate from lyric_search import notifier class CacheUtils: """ Lyrics Cache DB Utils """ def __init__(self) -> None: self.lyrics_db_path: LiteralString = os.path.join( "/usr/local/share", "sqlite_dbs", "cached_lyrics.db" ) async def check_typeahead(self, query: str) -> Optional[list[str]]: """Lyric Search Typeahead DB Handler""" if not query: return None async with sqlite3.connect(self.lyrics_db_path, timeout=1) as _db: _db.row_factory = sqlite3.Row db_query: str = """SELECT DISTINCT(LOWER(TRIM(artist) || " - " || TRIM(song))),\ (TRIM(artist) || " - " || TRIM(song)) as ret FROM lyrics WHERE\ ret LIKE ? LIMIT 100""" db_params: tuple[str] = (f"%%%{query}%%%",) async with _db.execute(db_query, db_params) as _cursor: result: Iterable[sqlite3.Row] = await _cursor.fetchall() out_result = [str(r["ret"]) for r in result] return out_result class LyricSearch(FastAPI): """ Lyric Search Endpoint """ def __init__(self, app: FastAPI, util, constants) -> None: self.app: FastAPI = app self.util = util self.constants = constants self.cache_utils = CacheUtils() self.notifier = notifier.DiscordNotifier() self.endpoints: dict = { "typeahead/lyrics": self.typeahead_handler, "lyric_search": self.lyric_search_handler, # Preserving old endpoint path temporarily "lyric/search": self.lyric_search_handler, } self.acceptable_request_sources: list = [ "WEB", "WEB-RADIO", "DISC-HAVOC", "LIMNORIA-SHARED", "IRC-SHARED", ] self.lrc_regex: Pattern = regex.compile( r"\[([0-9]{2}:[0-9]{2})\.[0-9]{1,3}\](\s(.*)){0,}" ) for endpoint, handler in self.endpoints.items(): _schema_include = endpoint in ["lyric/search"] app.add_api_route( f"/{endpoint}", handler, methods=["POST"], include_in_schema=_schema_include, ) async def typeahead_handler(self, data: ValidTypeAheadRequest) -> JSONResponse: """ Lyric search typeahead handler - **query**: Typeahead query """ if not isinstance(data.query, str): return JSONResponse( status_code=500, content={ "err": True, "errorText": "Invalid request.", }, ) typeahead: Optional[list[str]] = await self.cache_utils.check_typeahead( data.query ) if not typeahead: return JSONResponse(content=[]) return JSONResponse(content=typeahead) async def lyric_search_handler(self, data: ValidLyricRequest) -> JSONResponse: """ Search for lyrics - **a**: artist - **s**: song - **t**: track (artist and song combined) [used only if a & s are not used] - **extra**: include extra details in response [optional, default: false] - **lrc**: Request LRCs? - **sub**: text to search within lyrics, if found lyrics will begin at found verse [optional, default: none] - **src**: the script/utility which initiated the request - **excluded_sources**: sources to exclude [optional, default: none] """ if (not data.a or not data.s) and not data.t or not data.src: raise HTTPException(detail="Invalid request", status_code=500) if data.src.upper() not in self.acceptable_request_sources: await self.notifier.send( f"ERROR @ {__file__.rsplit("/", maxsplit=1)[-1]}", f"Unknown request source: {data.src}", ) return JSONResponse( status_code=500, content={ "err": True, "errorText": f"Unknown request source: {data.src}", }, ) if not data.t: search_artist: Optional[str] = data.a search_song: Optional[str] = data.s else: t_split: tuple = tuple(data.t.split(" - ", maxsplit=1)) (search_artist, search_song) = t_split if search_artist and search_song: search_artist = str( self.constants.DOUBLE_SPACE_REGEX.sub(" ", search_artist.strip()) ) search_song = str( self.constants.DOUBLE_SPACE_REGEX.sub(" ", search_song.strip()) ) search_artist = urllib.parse.unquote(search_artist) search_song = urllib.parse.unquote(search_song) if not isinstance(search_artist, str) or not isinstance(search_song, str): return JSONResponse( status_code=500, content={ "err": True, "errorText": "Invalid request", }, ) excluded_sources: Optional[list] = data.excluded_sources aggregate_search = aggregate.Aggregate(exclude_methods=excluded_sources) plain_lyrics: bool = not data.lrc result: Optional[Union[LyricsResult, dict]] = await aggregate_search.search( search_artist, search_song, plain_lyrics ) if not result: return JSONResponse( content={ "err": True, "errorText": "Sources exhausted, lyrics not located.", } ) result = vars(result) if data.sub and not data.lrc: seeked_found_line: Optional[int] = None lyric_lines: list[str] = result["lyrics"].strip().split(" / ") for i, line in enumerate(lyric_lines): line = regex.sub(r"\u2064", "", line.strip()) if data.sub.strip().lower() in line.strip().lower(): seeked_found_line = i logging.debug( "Found %s at %s, match for %s!", line, seeked_found_line, data.sub, ) # REMOVEME: DEBUG break if not seeked_found_line: return JSONResponse( status_code=500, content={ "err": True, "errorText": "Seek (a.k.a. subsearch) failed.", "failed_seek": True, }, ) result["lyrics"] = " / ".join(lyric_lines[seeked_found_line:]) result["confidence"] = int(result["confidence"]) result["time"] = f"{float(result['time']):.4f}" if plain_lyrics: result["lyrics"] = regex.sub( r"(\s/\s|\n)", "
", result["lyrics"] ).strip() else: # Swap lyrics key for 'lrc' result["lrc"] = result["lyrics"] result.pop("lyrics") if "cache" in result["src"]: result["from_cache"] = True if not data.extra: result.pop("src") return JSONResponse(content=result)