198 lines
8.1 KiB
Python
198 lines
8.1 KiB
Python
import logging
|
|
import os
|
|
import urllib.parse
|
|
import regex
|
|
import aiosqlite as sqlite3
|
|
from fastapi import FastAPI, HTTPException
|
|
from fastapi.responses import JSONResponse
|
|
from typing import LiteralString, Optional, Union
|
|
from regex import Pattern
|
|
from .constructors import ValidTypeAheadRequest, ValidLyricRequest
|
|
from lyric_search.constructors import LyricsResult
|
|
from lyric_search.sources import aggregate
|
|
from lyric_search import notifier
|
|
|
|
class CacheUtils:
|
|
"""
|
|
Lyrics Cache DB Utils
|
|
"""
|
|
def __init__(self) -> None:
|
|
self.lyrics_db_path: LiteralString = os.path.join("/usr/local/share",
|
|
"sqlite_dbs", "cached_lyrics.db")
|
|
|
|
async def check_typeahead(self, s: str,
|
|
pre_query: Optional[str] = None) -> list[dict]:
|
|
"""
|
|
Check s against artists stored - for typeahead
|
|
"""
|
|
async with sqlite3.connect(self.lyrics_db_path,
|
|
timeout=2) as db_conn:
|
|
db_conn.row_factory = sqlite3.Row
|
|
if not pre_query:
|
|
query: str = "SELECT distinct(artist) FROM lyrics WHERE artist LIKE ? LIMIT 15"
|
|
query_params: tuple = (f"%{s}%",)
|
|
else:
|
|
query = "SELECT distinct(song) FROM lyrics WHERE artist LIKE ? AND song LIKE ? LIMIT 15"
|
|
query_params = (f"%{pre_query}%", f"%{s}%",)
|
|
async with await db_conn.execute(query, query_params) as db_cursor:
|
|
return await db_cursor.fetchall()
|
|
|
|
|
|
class LyricSearch(FastAPI):
|
|
"""
|
|
Lyric Search Endpoint
|
|
"""
|
|
def __init__(self, app: FastAPI,
|
|
util, constants) -> None:
|
|
self.app: FastAPI = app
|
|
self.util = util
|
|
self.constants = constants
|
|
self.cache_utils = CacheUtils()
|
|
self.notifier = notifier.DiscordNotifier()
|
|
|
|
|
|
self.endpoints: dict = {
|
|
"typeahead/artist": self.artist_typeahead_handler,
|
|
"typeahead/song": self.song_typeahead_handler,
|
|
"lyric_search": self.lyric_search_handler, # Preserving old endpoint path temporarily
|
|
"lyric/search": self.lyric_search_handler,
|
|
}
|
|
|
|
self.acceptable_request_sources: list = [
|
|
"WEB",
|
|
"WEB-RADIO",
|
|
"DISC-HAVOC",
|
|
"LIMNORIA-SHARED",
|
|
"IRC-SHARED",
|
|
]
|
|
|
|
self.lrc_regex: Pattern = regex.compile(r'\[([0-9]{2}:[0-9]{2})\.[0-9]{1,3}\](\s(.*)){0,}')
|
|
|
|
for endpoint, handler in self.endpoints.items():
|
|
_schema_include = endpoint in ["lyric/search"]
|
|
app.add_api_route(f"/{endpoint}", handler, methods=["POST"], include_in_schema=_schema_include)
|
|
|
|
async def artist_typeahead_handler(self, data: ValidTypeAheadRequest) -> JSONResponse:
|
|
"""
|
|
Artist Type Ahead Handler
|
|
- **query**: The query
|
|
"""
|
|
if not isinstance(data.query, str) or len(data.query) < 2:
|
|
return JSONResponse(status_code=500, content={
|
|
'err': True,
|
|
'errorText': 'Invalid request',
|
|
})
|
|
query: str = data.query
|
|
typeahead_result: list[dict] = await self.cache_utils.check_typeahead(query)
|
|
typeahead_list: list[str] = [str(r['artist']) for r in typeahead_result]
|
|
return JSONResponse(content=typeahead_list)
|
|
|
|
async def song_typeahead_handler(self, data: ValidTypeAheadRequest) -> JSONResponse:
|
|
"""
|
|
Song Type Ahead Handler
|
|
- **query**: The query
|
|
- **pre_query**: The pre-query (artist)
|
|
"""
|
|
if not isinstance(data.pre_query, str)\
|
|
or not isinstance(data.query, str):
|
|
return JSONResponse(status_code=500, content={
|
|
'err': True,
|
|
'errorText': 'Invalid request',
|
|
})
|
|
pre_query: str = data.pre_query
|
|
query: str = data.query
|
|
typeahead_result: list[dict] = await self.cache_utils.check_typeahead(query, pre_query)
|
|
typeahead_list: list[str] = [str(r['song']) for r in typeahead_result]
|
|
return JSONResponse(content=typeahead_list)
|
|
|
|
async def lyric_search_handler(self, data: ValidLyricRequest) -> JSONResponse:
|
|
"""
|
|
Search for lyrics
|
|
- **a**: artist
|
|
- **s**: song
|
|
- **t**: track (artist and song combined) [used only if a & s are not used]
|
|
- **extra**: include extra details in response [optional, default: false]
|
|
- **lrc**: Request LRCs?
|
|
- **sub**: text to search within lyrics, if found lyrics will begin at found verse [optional, default: none]
|
|
- **src**: the script/utility which initiated the request
|
|
- **excluded_sources**: sources to exclude [optional, default: none]
|
|
"""
|
|
if (not data.a or not data.s) and not data.t or not data.src:
|
|
raise HTTPException(detail="Invalid request", status_code=500)
|
|
|
|
if data.src.upper() not in self.acceptable_request_sources:
|
|
await self.notifier.send(f"ERROR @ {__file__.rsplit("/", maxsplit=1)[-1]}",
|
|
f"Unknown request source: {data.src}")
|
|
return JSONResponse(status_code=500, content={
|
|
'err': True,
|
|
'errorText': f'Unknown request source: {data.src}',
|
|
})
|
|
|
|
if not data.t:
|
|
search_artist: Optional[str] = data.a
|
|
search_song: Optional[str] = data.s
|
|
else:
|
|
t_split: tuple = tuple(data.t.split(" - ", maxsplit=1))
|
|
(search_artist, search_song) = t_split
|
|
|
|
if search_artist and search_song:
|
|
search_artist = str(self.constants.DOUBLE_SPACE_REGEX.sub(" ", search_artist.strip()))
|
|
search_song = str(self.constants.DOUBLE_SPACE_REGEX.sub(" ", search_song.strip()))
|
|
search_artist = urllib.parse.unquote(search_artist)
|
|
search_song = urllib.parse.unquote(search_song)
|
|
|
|
if not isinstance(search_artist, str) or not isinstance(search_song, str):
|
|
return JSONResponse(status_code=500, content={
|
|
'err': True,
|
|
'errorText': 'Invalid request',
|
|
})
|
|
|
|
excluded_sources: Optional[list] = data.excluded_sources
|
|
aggregate_search = aggregate.Aggregate(exclude_methods=excluded_sources)
|
|
plain_lyrics: bool = not data.lrc
|
|
result: Optional[Union[LyricsResult, dict]] = await aggregate_search.search(search_artist, search_song, plain_lyrics)
|
|
|
|
if not result:
|
|
return JSONResponse(content={
|
|
'err': True,
|
|
'errorText': 'Sources exhausted, lyrics not located.',
|
|
})
|
|
|
|
result = vars(result)
|
|
|
|
if data.sub and not data.lrc:
|
|
seeked_found_line: Optional[int] = None
|
|
lyric_lines: list[str] = result['lyrics'].strip().split(" / ")
|
|
for i, line in enumerate(lyric_lines):
|
|
line = regex.sub(r'\u2064', '', line.strip())
|
|
if data.sub.strip().lower() in line.strip().lower():
|
|
seeked_found_line = i
|
|
logging.debug("Found %s at %s, match for %s!",
|
|
line, seeked_found_line, data.sub) # REMOVEME: DEBUG
|
|
break
|
|
|
|
if not seeked_found_line:
|
|
return JSONResponse(status_code=500, content={
|
|
'err': True,
|
|
'errorText': 'Seek (a.k.a. subsearch) failed.',
|
|
'failed_seek': True,
|
|
})
|
|
result['lyrics'] = " / ".join(lyric_lines[seeked_found_line:])
|
|
|
|
result['confidence'] = int(result['confidence'])
|
|
result['time'] = f'{float(result['time']):.4f}'
|
|
|
|
if plain_lyrics:
|
|
result['lyrics'] = regex.sub(r'(\s/\s|\n)', '<br>', result['lyrics']).strip()
|
|
else:
|
|
# Swap lyrics key for 'lrc'
|
|
result['lrc'] = result['lyrics']
|
|
result.pop('lyrics')
|
|
|
|
if "cache" in result['src']:
|
|
result['from_cache'] = True
|
|
|
|
if not data.extra:
|
|
result.pop('src')
|
|
|
|
return JSONResponse(content=result) |