258 lines
9.3 KiB
Python
258 lines
9.3 KiB
Python
#!/usr/bin/env python3.12
|
|
# pylint: disable=bare-except, broad-exception-raised, broad-exception-caught
|
|
|
|
import importlib
|
|
import traceback
|
|
import logging
|
|
import os
|
|
import urllib.parse
|
|
from typing import Optional
|
|
import regex
|
|
import aiohttp
|
|
import aiosqlite as sqlite3
|
|
from fastapi import FastAPI, HTTPException
|
|
from pydantic import BaseModel
|
|
from lyric_search.sources import aggregate
|
|
from lyric_search import notifier
|
|
|
|
|
|
class ValidLyricRequest(BaseModel):
|
|
"""
|
|
- **a**: artist
|
|
- **s**: song
|
|
- **t**: track (artist and song combined) [used only if a & s are not used]
|
|
- **extra**: include extra details in response [optional, default: false]
|
|
- **lrc**: Request LRCs?
|
|
- **sub**: text to search within lyrics, if found lyrics will begin at found verse [optional]
|
|
- **src**: the script/utility which initiated the request
|
|
- **excluded_sources**: sources to exclude (new only)
|
|
"""
|
|
|
|
a: str | None = None
|
|
s: str | None = None
|
|
t: str | None = None
|
|
sub: str | None = None
|
|
extra: bool | None = False
|
|
lrc: bool | None = False
|
|
src: str
|
|
excluded_sources: list | None = None
|
|
|
|
class Config: # pylint: disable=missing-class-docstring too-few-public-methods
|
|
schema_extra = {
|
|
"example": {
|
|
"a": "eminem",
|
|
"s": "rap god",
|
|
"src": "WEB",
|
|
"extra": True,
|
|
"lrc": False,
|
|
}
|
|
}
|
|
|
|
class ValidTypeAheadRequest(BaseModel):
|
|
"""
|
|
- **query**: query string
|
|
"""
|
|
pre_query: str|None = None
|
|
query: str
|
|
|
|
|
|
class ValidLyricSearchLogRequest(BaseModel):
|
|
"""
|
|
- **webradio**: whether or not to include requests generated automatically by the radio page on codey.lol, defaults to False
|
|
"""
|
|
|
|
webradio: bool = False
|
|
|
|
|
|
class CacheUtils:
|
|
"""Lyrics Cache DB Utils"""
|
|
def __init__(self):
|
|
self.lyrics_db_path = os.path.join("/", "usr", "local", "share",
|
|
"sqlite_dbs", "cached_lyrics.db")
|
|
|
|
async def check_typeahead(self, s: str, pre_query: str | None = None):
|
|
"""Check s against artists stored - for typeahead"""
|
|
async with sqlite3.connect(self.lyrics_db_path,
|
|
timeout=2) as db_conn:
|
|
db_conn.row_factory = lambda c, r: dict([(col[0], r[idx]) for idx, col in enumerate(c.description)])
|
|
if not pre_query:
|
|
query = "SELECT distinct(artist) FROM lyrics WHERE artist LIKE ? LIMIT 15"
|
|
query_params = (f"%{s}%",)
|
|
else:
|
|
query = "SELECT distinct(song) FROM lyrics WHERE artist LIKE ? AND song LIKE ? LIMIT 15"
|
|
query_params = (f"%{pre_query}%", f"%{s}%",)
|
|
async with await db_conn.execute(query, query_params) as db_cursor:
|
|
return await db_cursor.fetchall()
|
|
|
|
|
|
class LyricSearch(FastAPI):
|
|
"""Lyric Search Endpoint"""
|
|
def __init__(self, app: FastAPI, util, constants, glob_state): # pylint: disable=super-init-not-called
|
|
self.app = app
|
|
self.util = util
|
|
self.constants = constants
|
|
self.glob_state = glob_state
|
|
self.cache_utils = CacheUtils()
|
|
self.notifier = notifier.DiscordNotifier()
|
|
|
|
|
|
self.endpoints = {
|
|
"typeahead/artist": self.artist_typeahead_handler,
|
|
"typeahead/song": self.song_typeahead_handler,
|
|
"lyric_search": self.lyric_search_handler,
|
|
# "lyric_cache_list": self.lyric_cache_list_handler,
|
|
}
|
|
|
|
self.acceptable_request_sources = [
|
|
"WEB",
|
|
"WEB-RADIO",
|
|
"IRC-MS",
|
|
"IRC-FS",
|
|
"IRC-KALI",
|
|
"DISC-ACES",
|
|
"DISC-HAVOC",
|
|
"IRC-SHARED",
|
|
"LIMNORIA-SHARED",
|
|
]
|
|
|
|
self.lrc_regex = regex.compile(r'\[([0-9]{2}:[0-9]{2})\.[0-9]{1,3}\](\s(.*)){0,}')
|
|
|
|
for endpoint, handler in self.endpoints.items():
|
|
app.add_api_route(f"/{endpoint}/", handler, methods=["POST"])
|
|
|
|
# async def lyric_cache_list_handler(self):
|
|
# """
|
|
# Get currently cached lyrics entries
|
|
# """
|
|
# return {
|
|
# 'err': False,
|
|
# 'data': await self.lyrics_engine.listCacheEntries()
|
|
# }
|
|
|
|
async def artist_typeahead_handler(self, data: ValidTypeAheadRequest):
|
|
"""Artist Type Ahead Handler"""
|
|
if not isinstance(data.query, str) or len(data.query) < 2:
|
|
return {
|
|
'err': True,
|
|
'errorText': 'Invalid request',
|
|
}
|
|
query = data.query
|
|
typeahead_result = await self.cache_utils.check_typeahead(query)
|
|
typeahead_list = [str(r.get('artist')) for r in typeahead_result]
|
|
return typeahead_list
|
|
|
|
async def song_typeahead_handler(self, data: ValidTypeAheadRequest):
|
|
"""Song Type Ahead Handler"""
|
|
if not isinstance(data.pre_query, str)\
|
|
or not isinstance(data.query, str|None):
|
|
return {
|
|
'err': True,
|
|
'errorText': 'Invalid request',
|
|
}
|
|
pre_query = data.pre_query
|
|
query = data.query
|
|
typeahead_result = await self.cache_utils.check_typeahead(query, pre_query)
|
|
typeahead_list = [str(r.get('song')) for r in typeahead_result]
|
|
return typeahead_list
|
|
|
|
# async def lyric_search_log_handler(self, data: ValidLyricSearchLogRequest):
|
|
# """Lyric Search Log Handler"""
|
|
# include_radio = data.webradio
|
|
# await self.glob_state.increment_counter('lyrichistory_requests')
|
|
# last_10k_sings = await self.lyrics_engine.getHistory(limit=10000, webradio=include_radio)
|
|
# return {
|
|
# 'err': False,
|
|
# 'history': last_10k_sings
|
|
# }
|
|
|
|
async def lyric_search_handler(self, data: ValidLyricRequest):
|
|
"""
|
|
Search for lyrics
|
|
|
|
- **a**: artist
|
|
- **s**: song
|
|
- **t**: track (artist and song combined) [used only if a & s are not used]
|
|
- **extra**: include extra details in response [optional, default: false] [unused]
|
|
- **lrc**: Request LRCs?
|
|
- **sub**: text to search within lyrics, if found lyrics will begin at found verse [optional, default: none]
|
|
- **src**: the script/utility which initiated the request
|
|
- **excluded_sources**: sources to exclude [optional, default: none]
|
|
"""
|
|
|
|
if (not data.a or not data.s) and not data.t or not data.src:
|
|
raise HTTPException(detail="Invalid request", status_code=500)
|
|
|
|
if data.src.upper() not in self.acceptable_request_sources:
|
|
await self.notifier.send(f"ERROR @ {__file__.rsplit("/", maxsplit=1)[-1]}",
|
|
f"Unknown request source: {data.src}")
|
|
return {
|
|
'err': True,
|
|
'errorText': f'Unknown request source: {data.src}',
|
|
}
|
|
|
|
if not data.t:
|
|
search_artist: str = data.a
|
|
search_song: str = data.s
|
|
else:
|
|
t_split = data.t.split(" - ", maxsplit=1)
|
|
search_artist: str = t_split[0]
|
|
search_song: str = t_split[1]
|
|
|
|
|
|
if search_artist and search_song:
|
|
search_artist = self.constants.DOUBLE_SPACE_REGEX.sub(" ", search_artist.strip())
|
|
search_song = self.constants.DOUBLE_SPACE_REGEX.sub(" ", search_song.strip())
|
|
search_artist = urllib.parse.unquote(search_artist)
|
|
search_song = urllib.parse.unquote(search_song)
|
|
|
|
excluded_sources = data.excluded_sources
|
|
aggregate_search = aggregate.Aggregate(exclude_methods=excluded_sources)
|
|
plain_lyrics = not data.lrc
|
|
result = await aggregate_search.search(search_artist, search_song, plain_lyrics)
|
|
|
|
if not result:
|
|
return {
|
|
'err': True,
|
|
'errorText': 'Sources exhausted, lyrics not located.',
|
|
}
|
|
|
|
result = result.todict()
|
|
|
|
if data.sub and not data.lrc:
|
|
seeked_found_line = None
|
|
lyric_lines = result['lyrics'].strip().split(" / ")
|
|
for i, line in enumerate(lyric_lines):
|
|
line = regex.sub(r'\u2064', '', line.strip())
|
|
if data.sub.strip().lower() in line.strip().lower():
|
|
seeked_found_line = i
|
|
logging.debug("Found %s at %s, match for %s!",
|
|
line, seeked_found_line, data.sub) # REMOVEME: DEBUG
|
|
break
|
|
|
|
if not seeked_found_line:
|
|
return {
|
|
'failed_seek': True,
|
|
}
|
|
result['lyrics'] = " / ".join(lyric_lines[seeked_found_line:])
|
|
|
|
result['confidence'] = int(result.get('confidence', 0))
|
|
result['time'] = f'{float(result['time']):.4f}'
|
|
|
|
if plain_lyrics:
|
|
result['lyrics'] = regex.sub(r'(\s/\s|\n)', '<br>', result['lyrics']).strip()
|
|
else:
|
|
# Swap lyrics key for 'lrc'
|
|
result['lrc'] = result['lyrics']
|
|
result.pop('lyrics')
|
|
|
|
if "cache" in result['src']:
|
|
result['from_cache'] = True
|
|
|
|
"""
|
|
REMOVE BELOW AFTER TESTING IS DONE
|
|
"""
|
|
|
|
# if not data.extra:
|
|
# result.pop('src')
|
|
return result
|