lyric_search_new renamed to lyric_search
This commit is contained in:
4
lyric_search/__init__.py
Normal file
4
lyric_search/__init__.py
Normal file
@ -0,0 +1,4 @@
|
||||
#!/usr/bin/env python3.12
|
||||
# pylint: disable=empty-docstring
|
||||
"""
|
||||
"""
|
25
lyric_search/constructors.py
Normal file
25
lyric_search/constructors.py
Normal file
@ -0,0 +1,25 @@
|
||||
#!/usr/bin/env python3.12
|
||||
|
||||
from dataclasses import dataclass, asdict
|
||||
|
||||
@dataclass
|
||||
class LyricsResult:
|
||||
"""
|
||||
Class for returned Lyrics Results
|
||||
Attributes:
|
||||
artist (str): returned artist
|
||||
song (str): returned song
|
||||
src (str): source result was fetched from
|
||||
lyrics (str|list): str if plain lyrics, list for lrc
|
||||
time (float): time taken to retrieve lyrics from source
|
||||
"""
|
||||
artist: str
|
||||
song: str
|
||||
src: str
|
||||
lyrics: str|list
|
||||
confidence: int
|
||||
time: float = 0.00
|
||||
|
||||
def todict(self) -> dict:
|
||||
"""Return as dict"""
|
||||
return asdict(self)
|
0
lyric_search/sources/__init__.py
Normal file
0
lyric_search/sources/__init__.py
Normal file
80
lyric_search/sources/aggregate.py
Normal file
80
lyric_search/sources/aggregate.py
Normal file
@ -0,0 +1,80 @@
|
||||
#!/usr/bin/env python3.12
|
||||
# pylint: disable=wrong-import-order, wrong-import-position
|
||||
|
||||
from typing import Optional
|
||||
from lyric_search.constructors import LyricsResult
|
||||
from lyric_search import notifier
|
||||
import sys
|
||||
import logging
|
||||
import traceback
|
||||
sys.path.insert(1,'..')
|
||||
from . import cache, redis_cache, genius, lrclib
|
||||
|
||||
logger = logging.getLogger()
|
||||
logger.setLevel(logging.INFO)
|
||||
|
||||
class Aggregate:
|
||||
"""
|
||||
Aggregate all source methods
|
||||
"""
|
||||
|
||||
def __init__(self, exclude_methods=None) -> None:
|
||||
if not exclude_methods:
|
||||
exclude_methods: list = []
|
||||
self.exclude_methods = exclude_methods
|
||||
self.redis_cache = redis_cache.RedisCache()
|
||||
self.notifier = notifier.DiscordNotifier()
|
||||
|
||||
async def search(self, artist: str, song: str, plain: bool = True) -> Optional[LyricsResult]:
|
||||
"""
|
||||
Aggregate Search
|
||||
Args:
|
||||
artist (str): Artist to search
|
||||
song (str): Song to search
|
||||
plain (bool): Search for plain lyrics (lrc otherwise)
|
||||
Returns:
|
||||
LyricsResult|None: The result, if found - None otherwise.
|
||||
"""
|
||||
if not plain:
|
||||
logging.info("LRCs requested, limiting search to LRCLib")
|
||||
self.exclude_methods = ["genius", "cache"]
|
||||
logging.info("Performing aggregate search")
|
||||
cache_search = cache.Cache()
|
||||
genius_search = genius.Genius()
|
||||
lrclib_search = lrclib.LRCLib()
|
||||
sources: list = [
|
||||
cache_search,
|
||||
lrclib_search,
|
||||
genius_search,
|
||||
]
|
||||
if not plain:
|
||||
sources = [lrclib_search] # Only LRCLib supported for synced lyrics
|
||||
search_result: Optional[LyricsResult] = None
|
||||
for source in sources:
|
||||
if source.label.lower() in self.exclude_methods:
|
||||
if source.label.lower() == "cache" or not plain:
|
||||
logging.info("Exclude conditions rejected - source requested to exclude: %s, plain: %s",
|
||||
source.label, plain)
|
||||
else:
|
||||
if plain:
|
||||
logging.info("Skipping source: %s, excluded.", source.label)
|
||||
continue
|
||||
|
||||
search_result = await source.search(artist=artist, song=song,
|
||||
plain=plain)
|
||||
if search_result:
|
||||
break
|
||||
logging.info("%s: NOT FOUND!", source.label)
|
||||
if not search_result:
|
||||
logging.info("%s - %s: all sources exhausted, not found.",
|
||||
artist, song)
|
||||
if plain: # do not record LRC fails
|
||||
try:
|
||||
await self.redis_cache.increment_found_count("failed")
|
||||
except Exception as e:
|
||||
traceback.print_exc()
|
||||
logging.info("Could not increment redis failed counter: %s",
|
||||
str(e))
|
||||
self.notifier.send(f"ERROR @ {__file__.rsplit("/", maxsplit=1)[-1]}",
|
||||
f"Could not increment redis failed counter: {str(e)}")
|
||||
return search_result
|
317
lyric_search/sources/cache.py
Normal file
317
lyric_search/sources/cache.py
Normal file
@ -0,0 +1,317 @@
|
||||
#!/usr/bin/env python3.12
|
||||
# pylint: disable=wrong-import-order, wrong-import-position bare-except, broad-exception-caught
|
||||
|
||||
import os
|
||||
import time
|
||||
import regex
|
||||
import logging
|
||||
import sys
|
||||
import traceback
|
||||
sys.path.insert(1,'..')
|
||||
sys.path.insert(1,'.')
|
||||
from typing import Optional, Any
|
||||
import aiosqlite as sqlite3
|
||||
from . import redis_cache
|
||||
from lyric_search import utils, notifier
|
||||
from lyric_search.constructors import LyricsResult
|
||||
|
||||
|
||||
|
||||
logger = logging.getLogger()
|
||||
log_level = logging.getLevelName(logger.level)
|
||||
|
||||
class Cache:
|
||||
"""Cache Search Module"""
|
||||
def __init__(self) -> None:
|
||||
self.cache_db: str = os.path.join("/", "var",
|
||||
"lib", "singerdbs",
|
||||
"cached_lyrics.db")
|
||||
self.redis_cache = redis_cache.RedisCache()
|
||||
self.notifier = notifier.DiscordNotifier()
|
||||
|
||||
self.cache_pre_query: str = "pragma journal_mode = WAL; pragma synchronous = normal;\
|
||||
pragma temp_store = memory; pragma mmap_size = 30000000000;"
|
||||
self.sqlite_exts: list[str] = ['/usr/local/lib/python3.11/dist-packages/spellfix1.cpython-311-x86_64-linux-gnu.so']
|
||||
self.label: str = "Cache"
|
||||
|
||||
def get_matched(self, matched_candidate: tuple, confidence: int,
|
||||
sqlite_rows: list[sqlite3.Row] = None, redis_results: Any = None) -> Optional[LyricsResult]:
|
||||
"""
|
||||
Get Matched Result
|
||||
Args:
|
||||
matched_candidate (tuple): the correctly matched candidate returned by matcher.best_match
|
||||
confidence (int): % confidence
|
||||
sqlite_rows (list[sqlite3.Row]|None): List of returned rows from SQLite DB, or None if Redis
|
||||
redis_results (Any): List of Redis returned data, or None if SQLite
|
||||
Returns:
|
||||
LyricsResult|None: The result, if found - None otherwise.
|
||||
"""
|
||||
matched_id: int = matched_candidate[0]
|
||||
if redis_results:
|
||||
for res in redis_results:
|
||||
(key, row) = res
|
||||
if key == matched_id:
|
||||
return LyricsResult(
|
||||
artist=row['artist'],
|
||||
song=row['song'],
|
||||
lyrics=row['lyrics'],
|
||||
src=f"{row['src']} (redis cache, id: {key})",
|
||||
confidence=row['confidence']
|
||||
)
|
||||
else:
|
||||
for row in sqlite_rows:
|
||||
if row[0] == matched_id:
|
||||
(_id, artist, song, lyrics, original_src, _confidence) = row
|
||||
return LyricsResult(
|
||||
artist=artist,
|
||||
song=song,
|
||||
lyrics=lyrics,
|
||||
src=f"{original_src} (cached, id: {_id})",
|
||||
confidence=confidence)
|
||||
return None
|
||||
|
||||
async def check_existence(self, artistsong: str) -> Optional[bool]:
|
||||
"""
|
||||
Check whether lyrics are already stored for track
|
||||
Args:
|
||||
artistsong (str): artist and song in artist\\nsong format
|
||||
Returns:
|
||||
bool: Whether track was found in cache
|
||||
"""
|
||||
logging.debug("Checking whether %s is already stored",
|
||||
artistsong.replace("\n", " - "))
|
||||
check_query: str = 'SELECT id, artist, song FROM lyrics WHERE editdist3((lower(artist) || " " || lower(song)), (? || " " || ?))\
|
||||
<= 410 ORDER BY editdist3((lower(artist) || " " || lower(song)), ?) ASC LIMIT 1'
|
||||
artistsong_split = artistsong.split("\n", maxsplit=1)
|
||||
artist = artistsong_split[0].lower()
|
||||
song = artistsong_split[1].lower()
|
||||
params = (artist, song, artistsong.lower())
|
||||
async with sqlite3.connect(self.cache_db, timeout=2) as db_conn:
|
||||
await db_conn.enable_load_extension(True)
|
||||
for ext in self.sqlite_exts:
|
||||
await db_conn.load_extension(ext)
|
||||
async with await db_conn.executescript(self.cache_pre_query) as _db_cursor:
|
||||
async with await db_conn.execute(check_query, params) as db_cursor:
|
||||
result = await db_cursor.fetchone()
|
||||
if result:
|
||||
logging.debug("%s is already stored.",
|
||||
artistsong.replace("\n", " - "))
|
||||
return True
|
||||
logging.debug("%s cleared to be stored.",
|
||||
artistsong)
|
||||
return False
|
||||
|
||||
async def store(self, lyr_result: LyricsResult) -> None:
|
||||
"""
|
||||
Store lyrics (SQLite, then Redis)
|
||||
Args:
|
||||
lyr_result (LyricsResult): the returned lyrics to cache
|
||||
Returns: None
|
||||
"""
|
||||
|
||||
try:
|
||||
sqlite_insert_id = await self.sqlite_store(lyr_result)
|
||||
if sqlite_insert_id:
|
||||
await self.redis_cache.redis_store(sqlite_insert_id, lyr_result)
|
||||
except Exception as e:
|
||||
traceback.print_exc()
|
||||
logging.error("ERROR @ %s: %s",
|
||||
__file__.rsplit("/", maxsplit=1)[-1], f"cache::store >> {str(e)}")
|
||||
await self.notifier.send(f"ERROR @ {__file__.rsplit("/", maxsplit=1)[-1]}",
|
||||
f"cache::store >> {str(e)}")
|
||||
|
||||
async def sqlite_rowcount(self, where: Optional[str] = None, params: Optional[tuple] = None) -> int:
|
||||
"""
|
||||
Get rowcount for cached_lyrics DB
|
||||
Args:
|
||||
where (Optional[str]): WHERE ext for query if needed
|
||||
params (Optional[tuple]): Parameters to query, if where is specified
|
||||
Returns:
|
||||
int: Number of rows found
|
||||
"""
|
||||
async with sqlite3.connect(self.cache_db, timeout=2) as db_conn:
|
||||
db_conn.row_factory = sqlite3.Row
|
||||
query = f"SELECT count(id) AS rowcount FROM lyrics {where}".strip()
|
||||
async with await db_conn.execute(query, params) as db_cursor:
|
||||
result = await db_cursor.fetchone()
|
||||
return result['rowcount']
|
||||
|
||||
async def sqlite_distinct(self, column: str) -> int:
|
||||
"""
|
||||
Get count of distinct values for a column
|
||||
Args:
|
||||
column (str): The column to check
|
||||
Returns:
|
||||
int: Number of distinct values found
|
||||
"""
|
||||
async with sqlite3.connect(self.cache_db, timeout=2) as db_conn:
|
||||
db_conn.row_factory = sqlite3.Row
|
||||
query = f"SELECT COUNT(DISTINCT {column}) as distinct_items FROM lyrics"
|
||||
async with await db_conn.execute(query) as db_cursor:
|
||||
result = await db_cursor.fetchone()
|
||||
return result['distinct_items']
|
||||
|
||||
async def sqlite_lyrics_length(self) -> int:
|
||||
"""
|
||||
Get total length of text stored for lyrics
|
||||
Args:
|
||||
None
|
||||
Returns:
|
||||
int: Total length of stored lyrics
|
||||
"""
|
||||
async with sqlite3.connect(self.cache_db, timeout=2) as db_conn:
|
||||
db_conn.row_factory = sqlite3.Row
|
||||
query = "SELECT SUM(LENGTH(lyrics)) as lyrics_len FROM lyrics"
|
||||
async with await db_conn.execute(query) as db_cursor:
|
||||
result = await db_cursor.fetchone()
|
||||
return result['lyrics_len']
|
||||
|
||||
|
||||
async def sqlite_store(self, lyr_result: LyricsResult) -> int:
|
||||
"""
|
||||
Store lyrics to SQLite Cache
|
||||
Args:
|
||||
lyr_result (LyricsResult): the returned lyrics to cache
|
||||
Returns:
|
||||
int: the inserted row id
|
||||
"""
|
||||
|
||||
logging.info("Storing %s",
|
||||
f"{lyr_result.artist} - {lyr_result.song}")
|
||||
|
||||
if lyr_result.src.lower() == "cache":
|
||||
logging.info("Skipping cache storage - returned LyricsResult originated from cache")
|
||||
return
|
||||
|
||||
artistsong = f"{lyr_result.artist}\n{lyr_result.song}"
|
||||
if await self.check_existence(artistsong):
|
||||
logging.info("Skipping cache storage - %s is already stored.",
|
||||
artistsong.replace("\n", " - "))
|
||||
return
|
||||
|
||||
try:
|
||||
lyrics = regex.sub(r'(<br>|\n|\r\n)', ' / ', lyr_result.lyrics.strip())
|
||||
lyrics = regex.sub(r'\s{2,}', ' ', lyrics)
|
||||
|
||||
insert_query = "INSERT INTO lyrics (src, date_retrieved, artist, song, artistsong, confidence, lyrics)\
|
||||
VALUES(?, ?, ?, ?, ?, ?, ?)"
|
||||
params = (lyr_result.src, time.time(), lyr_result.artist,
|
||||
lyr_result.song, artistsong, lyr_result.confidence, lyrics)
|
||||
|
||||
async with sqlite3.connect(self.cache_db, timeout=2) as db_conn:
|
||||
async with await db_conn.executescript(self.cache_pre_query) as _db_cursor:
|
||||
async with await db_conn.execute(insert_query, params) as _cursor:
|
||||
await db_conn.commit()
|
||||
logging.info("Stored %s to SQLite!", artistsong.replace("\n", " - "))
|
||||
return _cursor.lastrowid
|
||||
except:
|
||||
logging.critical("Cache storage error!")
|
||||
traceback.print_exc()
|
||||
|
||||
|
||||
|
||||
# pylint: disable=unused-argument
|
||||
async def search(self, artist: str, song: str, **kwargs) -> Optional[LyricsResult]:
|
||||
"""
|
||||
Cache Search
|
||||
Args:
|
||||
artist: the artist to search
|
||||
song: the song to search
|
||||
Returns:
|
||||
LyricsResult|None: The result, if found - None otherwise.
|
||||
"""
|
||||
try:
|
||||
# pylint: enable=unused-argument
|
||||
artist: str = artist.strip().lower()
|
||||
song: str = song.strip().lower()
|
||||
input_track: str = f"{artist} - {song}"
|
||||
search_query = None
|
||||
search_params: Optional[tuple] = None
|
||||
random_search: bool = False
|
||||
time_start: float = time.time()
|
||||
matcher = utils.TrackMatcher()
|
||||
|
||||
if artist == "!" and song == "!":
|
||||
random_search = True
|
||||
search_query: str = 'SELECT id, artist, song, lyrics, src, confidence\
|
||||
FROM lyrics ORDER BY RANDOM() LIMIT 1'
|
||||
|
||||
logging.info("Searching %s - %s on %s",
|
||||
artist, song, self.label)
|
||||
|
||||
"""Check Redis First"""
|
||||
|
||||
logging.debug("Checking redis cache for %s...",
|
||||
f"{artist} - {song}")
|
||||
redis_result = await self.redis_cache.search(artist=artist,
|
||||
song=song)
|
||||
|
||||
if redis_result:
|
||||
result_tracks: list = []
|
||||
for returned in redis_result:
|
||||
(key, track) = returned
|
||||
result_tracks.append((key, f"{track['artist']} - {track['song']}"))
|
||||
|
||||
if not random_search:
|
||||
best_match: tuple|None = matcher.find_best_match(input_track=input_track,
|
||||
candidate_tracks=result_tracks)
|
||||
else:
|
||||
best_match = (result_tracks[0], 100)
|
||||
|
||||
|
||||
if best_match:
|
||||
(candidate, confidence) = best_match
|
||||
matched = self.get_matched(redis_results=redis_result, matched_candidate=candidate,
|
||||
confidence=confidence)
|
||||
|
||||
if matched:
|
||||
time_end: float = time.time()
|
||||
time_diff: float = time_end - time_start
|
||||
matched.confidence = confidence
|
||||
matched.time = time_diff
|
||||
|
||||
logging.info("Found %s on redis cache, skipping SQLite...",
|
||||
f"{artist} - {song}")
|
||||
await self.redis_cache.increment_found_count(self.label)
|
||||
return matched
|
||||
|
||||
"""SQLite: Fallback"""
|
||||
|
||||
async with sqlite3.connect(self.cache_db, timeout=2) as db_conn:
|
||||
await db_conn.enable_load_extension(True)
|
||||
for ext in self.sqlite_exts:
|
||||
await db_conn.load_extension(ext)
|
||||
async with await db_conn.executescript(self.cache_pre_query) as _db_cursor:
|
||||
if not random_search:
|
||||
search_query: str = 'SELECT id, artist, song, lyrics, src, confidence FROM lyrics\
|
||||
WHERE editdist3((lower(artist) || " " || lower(song)), (? || " " || ?))\
|
||||
<= 410 ORDER BY editdist3((lower(artist) || " " || lower(song)), ?) ASC LIMIT 10'
|
||||
search_params: tuple = (artist.strip(), song.strip(),
|
||||
f"{artist.strip()} {song.strip()}")
|
||||
|
||||
async with await _db_cursor.execute(search_query, search_params) as db_cursor:
|
||||
results: list = await db_cursor.fetchall()
|
||||
result_tracks: list = []
|
||||
for track in results:
|
||||
(_id, _artist, _song, _lyrics, _src, _confidence) = track
|
||||
result_tracks.append((_id, f"{_artist} - {_song}"))
|
||||
if not random_search:
|
||||
best_match: tuple|None = matcher.find_best_match(input_track=input_track,
|
||||
candidate_tracks=result_tracks)
|
||||
else:
|
||||
best_match = (result_tracks[0], 100)
|
||||
if not best_match:
|
||||
return None
|
||||
(candidate, confidence) = best_match
|
||||
logging.info("Result found on %s", self.label)
|
||||
matched = self.get_matched(sqlite_rows=results,
|
||||
matched_candidate=candidate,
|
||||
confidence=confidence)
|
||||
time_end: float = time.time()
|
||||
time_diff: float = time_end - time_start
|
||||
matched.time = time_diff
|
||||
await self.redis_cache.increment_found_count(self.label)
|
||||
return matched
|
||||
except:
|
||||
traceback.print_exc()
|
||||
return
|
5
lyric_search/sources/common.py
Normal file
5
lyric_search/sources/common.py
Normal file
@ -0,0 +1,5 @@
|
||||
#!/usr/bin/env python3.12
|
||||
SCRAPE_HEADERS = {
|
||||
'accept': '*/*',
|
||||
'User-Agent': 'Mozilla/5.0 (X11; Linux x86_64; rv:130.0) Gecko/20100101 Firefox/130.0',
|
||||
}
|
132
lyric_search/sources/genius.py
Normal file
132
lyric_search/sources/genius.py
Normal file
@ -0,0 +1,132 @@
|
||||
#!/usr/bin/env python3.12
|
||||
# pylint: disable=bare-except, broad-exception-caught, wrong-import-order, wrong-import-position
|
||||
|
||||
import sys
|
||||
sys.path.insert(1,'..')
|
||||
import traceback
|
||||
import logging
|
||||
import time
|
||||
from typing import Optional
|
||||
from aiohttp import ClientTimeout, ClientSession
|
||||
from bs4 import BeautifulSoup, ResultSet
|
||||
import html as htm
|
||||
from . import private, common, cache, redis_cache
|
||||
from lyric_search import utils
|
||||
from lyric_search.constructors import LyricsResult
|
||||
|
||||
|
||||
logger = logging.getLogger()
|
||||
log_level = logging.getLevelName(logger.level)
|
||||
|
||||
class InvalidResponseException(Exception):
|
||||
"""
|
||||
InvalidResponseException
|
||||
"""
|
||||
|
||||
class Genius:
|
||||
"""Genius Search Module"""
|
||||
def __init__(self) -> None:
|
||||
self.label: str = "Genius"
|
||||
self.genius_url: str = private.GENIUS_URL
|
||||
self.genius_search_url: str = f'{self.genius_url}api/search/song?q='
|
||||
self.headers: dict = common.SCRAPE_HEADERS
|
||||
self.timeout = ClientTimeout(connect=3, sock_read=5)
|
||||
self.datautils = utils.DataUtils()
|
||||
self.matcher = utils.TrackMatcher()
|
||||
self.cache = cache.Cache()
|
||||
self.redis_cache = redis_cache.RedisCache()
|
||||
|
||||
# pylint: disable=unused-argument
|
||||
async def search(self, artist: str, song: str, **kwargs) -> Optional[LyricsResult]:
|
||||
"""
|
||||
Genius Search
|
||||
Args:
|
||||
artist (str): the artist to search
|
||||
song (str): the song to search
|
||||
Returns:
|
||||
LyricsResult|None: The result, if found - None otherwise.
|
||||
"""
|
||||
try:
|
||||
# pylint: enable=unused-argument
|
||||
artist: str = artist.strip().lower()
|
||||
song: str = song.strip().lower()
|
||||
time_start: float = time.time()
|
||||
logging.info("Searching %s - %s on %s",
|
||||
artist, song, self.label)
|
||||
search_term: str = f'{artist}%20{song}'
|
||||
returned_lyrics: str = ''
|
||||
async with ClientSession() as client:
|
||||
async with client.get(f'{self.genius_search_url}{search_term}',
|
||||
timeout=self.timeout,
|
||||
headers=self.headers) as request:
|
||||
request.raise_for_status()
|
||||
text: str|None = await request.text()
|
||||
|
||||
if len(text) < 100:
|
||||
raise InvalidResponseException("Search response text was invalid (len < 100 chars.)")
|
||||
search_data = await request.json()
|
||||
|
||||
if not isinstance(search_data, dict):
|
||||
raise InvalidResponseException("Invalid JSON.")
|
||||
|
||||
if not isinstance(search_data['response'], dict):
|
||||
raise InvalidResponseException(f"Invalid JSON: Cannot find response key.\n{search_data}")
|
||||
|
||||
if not isinstance(search_data['response']['sections'], list):
|
||||
raise InvalidResponseException(f"Invalid JSON: Cannot find response->sections key.\n{search_data}")
|
||||
|
||||
if not isinstance(search_data['response']['sections'][0]['hits'], list):
|
||||
raise InvalidResponseException("Invalid JSON: Cannot find response->sections[0]->hits key.")
|
||||
|
||||
possible_matches: list = search_data['response']['sections'][0]['hits']
|
||||
to_scrape: list[tuple] = [
|
||||
(
|
||||
returned['result']['path'],
|
||||
f'{returned['result']['artist_names']} - {returned['result']['title']}',
|
||||
) for returned in possible_matches
|
||||
]
|
||||
searched: str = f"{artist} - {song}"
|
||||
best_match: tuple = self.matcher.find_best_match(input_track=searched,
|
||||
candidate_tracks=to_scrape)
|
||||
((scrape_stub, track), confidence) = best_match
|
||||
scrape_url: str = f'{self.genius_url}{scrape_stub[1:]}'
|
||||
|
||||
async with client.get(scrape_url,
|
||||
timeout=self.timeout,
|
||||
headers=self.headers) as scrape_request:
|
||||
scrape_request.raise_for_status()
|
||||
scrape_text: str|None = await scrape_request.text()
|
||||
|
||||
if len(scrape_text) < 100:
|
||||
raise InvalidResponseException("Scrape response was invalid (len < 100 chars.)")
|
||||
|
||||
|
||||
html = BeautifulSoup(htm.unescape(scrape_text).replace('<br/>', '\n'), "html.parser")
|
||||
divs: ResultSet|None = html.find_all("div", {"data-lyrics-container": "true"})
|
||||
|
||||
if not divs:
|
||||
return
|
||||
|
||||
for div in divs:
|
||||
returned_lyrics += div.get_text()
|
||||
|
||||
returned_lyrics: str = self.datautils.scrub_lyrics(returned_lyrics)
|
||||
artist: str = track.split(" - ", maxsplit=1)[0]
|
||||
song: str = track.split(" - ", maxsplit=1)[1]
|
||||
logging.info("Result found on %s", self.label)
|
||||
time_end: float = time.time()
|
||||
time_diff: float = time_end - time_start
|
||||
matched = LyricsResult(artist=artist,
|
||||
song=song,
|
||||
src=self.label,
|
||||
lyrics=returned_lyrics,
|
||||
confidence=confidence,
|
||||
time=time_diff)
|
||||
await self.redis_cache.increment_found_count(self.label)
|
||||
await self.cache.store(matched)
|
||||
return matched
|
||||
|
||||
except:
|
||||
# if log_level == "DEBUG":
|
||||
traceback.print_exc()
|
||||
return
|
129
lyric_search/sources/lrclib.py
Normal file
129
lyric_search/sources/lrclib.py
Normal file
@ -0,0 +1,129 @@
|
||||
#!/usr/bin/env python3.12
|
||||
# pylint: disable=bare-except, broad-exception-caught, wrong-import-position
|
||||
|
||||
import sys
|
||||
import time
|
||||
sys.path.insert(1,'..')
|
||||
import traceback
|
||||
import logging
|
||||
from typing import Optional
|
||||
from aiohttp import ClientTimeout, ClientSession
|
||||
from lyric_search import utils
|
||||
from lyric_search.constructors import LyricsResult
|
||||
from . import common, cache, redis_cache
|
||||
|
||||
logger = logging.getLogger()
|
||||
log_level = logging.getLevelName(logger.level)
|
||||
|
||||
class InvalidResponseException(Exception):
|
||||
"""
|
||||
Invalid Response Exception
|
||||
"""
|
||||
|
||||
class LRCLib:
|
||||
"""LRCLib Search Module"""
|
||||
def __init__(self) -> None:
|
||||
self.label: str = "LRCLib"
|
||||
self.lrclib_url: str = "https://lrclib.net/api/search"
|
||||
self.headers: dict = common.SCRAPE_HEADERS
|
||||
self.timeout = ClientTimeout(connect=2, sock_read=4)
|
||||
self.datautils = utils.DataUtils()
|
||||
self.matcher = utils.TrackMatcher()
|
||||
self.cache = cache.Cache()
|
||||
self.redis_cache = redis_cache.RedisCache()
|
||||
|
||||
async def search(self, artist: str, song: str, plain: bool = True) -> Optional[LyricsResult]:
|
||||
"""
|
||||
LRCLib Search
|
||||
Args:
|
||||
artist (str): the artist to search
|
||||
song (str): the song to search
|
||||
Returns:
|
||||
LyricsResult|None: The result, if found - None otherwise.
|
||||
"""
|
||||
try:
|
||||
artist: str = artist.strip().lower()
|
||||
song: str = song.strip().lower()
|
||||
time_start: float = time.time()
|
||||
lrc_obj: Optional[list[dict]] = None
|
||||
|
||||
logging.info("Searching %s - %s on %s",
|
||||
artist, song, self.label)
|
||||
|
||||
input_track: str = f"{artist} - {song}"
|
||||
returned_lyrics: str = ''
|
||||
async with ClientSession() as client:
|
||||
async with await client.get(self.lrclib_url,
|
||||
params = {
|
||||
'artist_name': artist,
|
||||
'track_name': song,
|
||||
},
|
||||
timeout=self.timeout,
|
||||
headers=self.headers) as request:
|
||||
request.raise_for_status()
|
||||
text: str|None = await request.text()
|
||||
|
||||
if len(text) < 100:
|
||||
raise InvalidResponseException("Search response text was invalid (len < 100 chars.)")
|
||||
|
||||
search_data: dict|None = await request.json()
|
||||
|
||||
# logging.info("Search Data:\n%s", search_data)
|
||||
|
||||
if not isinstance(search_data, list):
|
||||
raise InvalidResponseException("Invalid JSON.")
|
||||
|
||||
if plain:
|
||||
possible_matches = [(x, f"{result.get('artistName')} - {result.get('trackName')}")
|
||||
for x, result in enumerate(search_data)]
|
||||
else:
|
||||
logging.info("Limiting possible matches to only those with non-null syncedLyrics")
|
||||
possible_matches = [(x, f"{result.get('artistName')} - {result.get('trackName')}")
|
||||
for x, result in enumerate(search_data) if isinstance(result['syncedLyrics'], str)]
|
||||
|
||||
|
||||
|
||||
best_match = self.matcher.find_best_match(input_track,
|
||||
possible_matches)[0]
|
||||
if not best_match:
|
||||
return
|
||||
best_match_id = best_match[0]
|
||||
|
||||
if not isinstance(search_data[best_match_id]['artistName'], str):
|
||||
raise InvalidResponseException(f"Invalid JSON: Cannot find artistName key.\n{search_data}")
|
||||
|
||||
if not isinstance(search_data[best_match_id]['trackName'], str):
|
||||
raise InvalidResponseException(f"Invalid JSON: Cannot find trackName key.\n{search_data}")
|
||||
|
||||
returned_artist: str = search_data[best_match_id]['artistName']
|
||||
returned_song: str = search_data[best_match_id]['trackName']
|
||||
if plain:
|
||||
if not isinstance(search_data[best_match_id]['plainLyrics'], str):
|
||||
raise InvalidResponseException(f"Invalid JSON: Cannot find plainLyrics key.\n{search_data}")
|
||||
returned_lyrics: str = search_data[best_match_id]['plainLyrics']
|
||||
returned_lyrics = self.datautils.scrub_lyrics(returned_lyrics)
|
||||
else:
|
||||
if not isinstance(search_data[best_match_id]['syncedLyrics'], str):
|
||||
raise InvalidResponseException(f"Invalid JSON: Cannot find syncedLyrics key.\n{search_data}")
|
||||
returned_lyrics: str = search_data[best_match_id]['syncedLyrics']
|
||||
lrc_obj = self.datautils.create_lrc_object(returned_lyrics)
|
||||
returned_track: str = f"{returned_artist} - {returned_song}"
|
||||
(_matched, confidence) = self.matcher.find_best_match(input_track=input_track,
|
||||
candidate_tracks=[(0, returned_track)])
|
||||
if not confidence:
|
||||
return # No suitable match found
|
||||
logging.info("Result found on %s", self.label)
|
||||
time_end: float = time.time()
|
||||
time_diff: float = time_end - time_start
|
||||
matched = LyricsResult(artist=returned_artist,
|
||||
song=returned_song,
|
||||
src=self.label,
|
||||
lyrics=returned_lyrics if plain else lrc_obj,
|
||||
confidence=confidence,
|
||||
time=time_diff)
|
||||
await self.redis_cache.increment_found_count(self.label)
|
||||
await self.cache.store(matched)
|
||||
return matched
|
||||
except:
|
||||
traceback.print_exc()
|
||||
return
|
214
lyric_search/sources/redis_cache.py
Normal file
214
lyric_search/sources/redis_cache.py
Normal file
@ -0,0 +1,214 @@
|
||||
#!/usr/bin/env python3.12
|
||||
# pylint: disable=bare-except, broad-exception-caught, wrong-import-order
|
||||
# pylint: disable=wrong-import-position
|
||||
|
||||
|
||||
import logging
|
||||
import traceback
|
||||
import json
|
||||
import time
|
||||
import sys
|
||||
import regex
|
||||
sys.path.insert(1,'..')
|
||||
from lyric_search import notifier
|
||||
from lyric_search.constructors import LyricsResult
|
||||
import redis.asyncio as redis
|
||||
from redis.commands.search.query import Query
|
||||
from redis.commands.search.indexDefinition import IndexDefinition, IndexType
|
||||
from redis.commands.search.field import TextField, TagField
|
||||
from redis.commands.json.path import Path
|
||||
from . import private
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
logger = logging.getLogger()
|
||||
log_level = logging.getLevelName(logger.level)
|
||||
|
||||
class RedisException(Exception):
|
||||
"""
|
||||
Redis Exception
|
||||
"""
|
||||
|
||||
class RedisCache:
|
||||
"""
|
||||
Redis Cache Methods
|
||||
"""
|
||||
|
||||
def __init__(self) -> None:
|
||||
self.redis_client = redis.Redis(password=private.REDIS_PW)
|
||||
self.notifier = notifier.DiscordNotifier()
|
||||
self.notify_warnings = True
|
||||
self.regexes = [
|
||||
regex.compile(r'\-'),
|
||||
regex.compile(r'[^a-zA-Z0-9\s]'),
|
||||
]
|
||||
|
||||
async def create_index(self) -> None:
|
||||
"""Create Index"""
|
||||
try:
|
||||
schema = (
|
||||
TextField("$.search_artist", as_name="artist"),
|
||||
TextField("$.search_song", as_name="song"),
|
||||
TextField("$.src", as_name="src"),
|
||||
TextField("$.lyrics", as_name="lyrics")
|
||||
)
|
||||
result = await self.redis_client.ft().create_index(
|
||||
schema, definition=IndexDefinition(prefix=["lyrics:"], index_type=IndexType.JSON))
|
||||
if str(result) != "OK":
|
||||
raise RedisException(f"Redis: Failed to create index: {result}")
|
||||
except Exception as e:
|
||||
await self.notifier.send(f"ERROR @ {__file__.rsplit("/", maxsplit=1)[-1]}", f"Failed to create idx: {str(e)}")
|
||||
|
||||
def sanitize_input(self, artist: str, song: str, fuzzy: bool = False) -> tuple[str, str]:
|
||||
"""
|
||||
Sanitize artist/song input (convert to redis matchable fuzzy query)
|
||||
Args:
|
||||
artist: Input artist
|
||||
song: Input song
|
||||
fuzzy: Whether to create fuzzy query str
|
||||
Returns:
|
||||
tuple[str, str]: Tuple containing the 2 output strings (artist, song)
|
||||
"""
|
||||
artist = self.regexes[0].sub("", artist)
|
||||
artist = self.regexes[1].sub("", artist).strip()
|
||||
song = self.regexes[0].sub("", song)
|
||||
song = self.regexes[1].sub("", song).strip()
|
||||
if fuzzy:
|
||||
artist = " ".join([f"(%{artist_word}%)" for artist_word in artist.split(" ")])
|
||||
song = " ".join([f"(%{song_word}%)" for song_word in song.split(" ")])
|
||||
return (artist, song)
|
||||
|
||||
async def increment_found_count(self, src: str) -> None:
|
||||
"""
|
||||
Increment the found count for a source
|
||||
Args:
|
||||
src (str): The source to increment
|
||||
Returns:
|
||||
None
|
||||
"""
|
||||
try:
|
||||
src = src.strip().lower()
|
||||
await self.redis_client.incr(f"returned:{src}")
|
||||
except Exception as e:
|
||||
await self.notifier.send(f"ERROR @ {__file__.rsplit("/", maxsplit=1)[-1]}", f"{str(e)}")
|
||||
traceback.print_exc()
|
||||
|
||||
async def get_found_counts(self) -> dict:
|
||||
"""
|
||||
Get found counts for all sources (and failed count)
|
||||
Args:
|
||||
None
|
||||
Returns:
|
||||
dict: In the form {'source': count, 'source2': count, ...}
|
||||
"""
|
||||
try:
|
||||
sources: list = ["cache", "lrclib", "genius", "failed"]
|
||||
counts: dict = {}
|
||||
for src in sources:
|
||||
src_found_count = await self.redis_client.get(f"returned:{src}")
|
||||
counts[src] = src_found_count
|
||||
return counts
|
||||
except Exception as e:
|
||||
await self.notifier.send(f"ERROR @ {__file__.rsplit("/", maxsplit=1)[-1]}", f"{str(e)}")
|
||||
traceback.print_exc()
|
||||
|
||||
|
||||
async def search(self, **kwargs) -> list[tuple]:
|
||||
"""
|
||||
Search Redis Cache
|
||||
Args:
|
||||
artist (Optional[str]): artist to search
|
||||
song (Optional[str]): song to search
|
||||
lyrics (Optional[str]): lyrics to search (optional, used in place of artist/song if provided)
|
||||
Returns:
|
||||
list[tuple]: List of redis results, tuple's first value is the redis key, second is the returned data
|
||||
"""
|
||||
|
||||
try:
|
||||
artist = kwargs.get('artist', '')
|
||||
song = kwargs.get('song', '')
|
||||
lyrics = kwargs.get('lyrics')
|
||||
is_random_search = artist == "!" and song == "!"
|
||||
|
||||
if lyrics:
|
||||
# to code later
|
||||
raise RedisException("Lyric search not yet implemented")
|
||||
|
||||
if not is_random_search:
|
||||
logging.debug("Redis: Searching normally first")
|
||||
(artist, song) = self.sanitize_input(artist, song)
|
||||
logging.debug("Seeking: %s - %s", artist, song)
|
||||
search_res = await self.redis_client.ft().search(Query(
|
||||
f"@artist:{artist} @song:{song}"
|
||||
))
|
||||
search_res_out = [(result['id'].split(":",
|
||||
maxsplit=1)[1], dict(json.loads(result['json'])))
|
||||
for result in search_res.docs]
|
||||
if not search_res_out:
|
||||
logging.debug("Redis: Normal search failed, trying with fuzzy search")
|
||||
|
||||
(fuzzy_artist, fuzzy_song) = self.sanitize_input(artist=artist,
|
||||
song=song, fuzzy=True)
|
||||
search_res = await self.redis_client.ft().search(Query(
|
||||
f"@artist:{fuzzy_artist} @song:{fuzzy_song}"
|
||||
))
|
||||
search_res_out = [(result['id'].split(":",
|
||||
maxsplit=1)[1], dict(json.loads(result['json'])))
|
||||
for result in search_res.docs]
|
||||
|
||||
else:
|
||||
random_redis_key = await self.redis_client.randomkey()
|
||||
out_id = str(random_redis_key).split(":",
|
||||
maxsplit=1)[1][:-1]
|
||||
search_res = await self.redis_client.json().get(random_redis_key)
|
||||
search_res_out = [(out_id, search_res)]
|
||||
|
||||
if not search_res_out and self.notify_warnings:
|
||||
await self.notifier.send("WARNING", f"Redis cache miss for: \n## *{artist} - {song}*")
|
||||
return search_res_out
|
||||
except Exception as e:
|
||||
await self.notifier.send(f"ERROR @ {__file__.rsplit("/", maxsplit=1)[-1]}", f"{str(e)}\nSearch was: {artist} - {song}")
|
||||
traceback.print_exc()
|
||||
|
||||
async def redis_store(self, sqlite_id: int, lyr_result: LyricsResult) -> None:
|
||||
"""
|
||||
Store lyrics to redis cache
|
||||
Args:
|
||||
sqlite_id (int): the row id of the related SQLite db insertion
|
||||
lyr_result (LyricsResult): the returned lyrics to cache
|
||||
Returns:
|
||||
None
|
||||
"""
|
||||
try:
|
||||
(search_artist, search_song) = self.sanitize_input(lyr_result.artist,
|
||||
lyr_result.song)
|
||||
redis_mapping = {
|
||||
'id': sqlite_id,
|
||||
'src': lyr_result.src,
|
||||
'date_retrieved': time.time(),
|
||||
'artist': lyr_result.artist,
|
||||
'search_artist': search_artist,
|
||||
'search_song': search_song,
|
||||
'search_artistsong': f'{search_artist}\n{search_song}',
|
||||
'song': lyr_result.song,
|
||||
'artistsong': f"{lyr_result.artist}\n{lyr_result.song}",
|
||||
'confidence': lyr_result.confidence,
|
||||
'lyrics': lyr_result.lyrics,
|
||||
'tags': '(none)',
|
||||
'liked': 0,
|
||||
}
|
||||
newkey = f"lyrics:000{sqlite_id}"
|
||||
jsonset = await self.redis_client.json().set(newkey, Path.root_path(),
|
||||
redis_mapping)
|
||||
if not jsonset:
|
||||
raise RedisException(f"Failed to store {lyr_result.artist} - {lyr_result.song} (SQLite id: {sqlite_id}) to redis:\n{jsonset}")
|
||||
logging.info("Stored %s - %s (related SQLite Row ID: %s) to %s",
|
||||
lyr_result.artist, lyr_result.song, sqlite_id, newkey)
|
||||
await self.notifier.send("INFO",
|
||||
f"Stored {lyr_result.artist} - {lyr_result.song} (related SQLite Row ID: {sqlite_id}) to redis: {newkey}")
|
||||
except Exception as e:
|
||||
await self.notifier.send(f"ERROR @ {__file__.rsplit("/", maxsplit=1)[-1]}",
|
||||
f"Failed to store {lyr_result.artist} - {lyr_result.song}\
|
||||
(SQLite id: {sqlite_id}) to Redis:\n{str(e)}")
|
148
lyric_search/utils.py
Normal file
148
lyric_search/utils.py
Normal file
@ -0,0 +1,148 @@
|
||||
#!/usr/bin/env python3.12
|
||||
|
||||
from difflib import SequenceMatcher
|
||||
from typing import List, Optional, Tuple
|
||||
import logging
|
||||
import regex
|
||||
|
||||
class TrackMatcher:
|
||||
"""Track Matcher"""
|
||||
def __init__(self, threshold: float = 0.85):
|
||||
"""
|
||||
Initialize the TrackMatcher with a similarity threshold.
|
||||
|
||||
Args:
|
||||
threshold (float): Minimum similarity score to consider a match valid
|
||||
(between 0 and 1, default 0.85)
|
||||
"""
|
||||
self.threshold = threshold
|
||||
|
||||
def find_best_match(self, input_track: str, candidate_tracks: List[tuple[int|str, str]]) -> Optional[Tuple[str, float]]:
|
||||
"""
|
||||
Find the best matching track from the candidate list.
|
||||
|
||||
Args:
|
||||
input_track (str): Input track in "ARTIST - SONG" format
|
||||
candidate_tracks (List[tuple[int|str, str]]): List of candidate tracks
|
||||
|
||||
Returns:
|
||||
Optional[Tuple[int, str, float]]: Tuple of (best matching track, similarity score)
|
||||
or None if no good match found
|
||||
"""
|
||||
|
||||
|
||||
if not input_track or not candidate_tracks:
|
||||
return None
|
||||
|
||||
# Normalize input track
|
||||
input_track = self._normalize_string(input_track)
|
||||
|
||||
best_match = None
|
||||
best_score = 0
|
||||
|
||||
for candidate in candidate_tracks:
|
||||
normalized_candidate = self._normalize_string(candidate[1])
|
||||
|
||||
# Calculate various similarity scores
|
||||
exact_score = 1.0 if input_track == normalized_candidate else 0.0
|
||||
sequence_score = SequenceMatcher(None, input_track, normalized_candidate).ratio()
|
||||
token_score = self._calculate_token_similarity(input_track, normalized_candidate)
|
||||
|
||||
# Take the maximum of the different scoring methods
|
||||
final_score = max(exact_score, sequence_score, token_score)
|
||||
|
||||
if final_score > best_score:
|
||||
best_score = final_score
|
||||
best_match = candidate
|
||||
|
||||
# Return the match only if it meets the threshold
|
||||
return (best_match, round(best_score * 100)) if best_score >= self.threshold else None
|
||||
|
||||
def _normalize_string(self, text: str) -> str:
|
||||
"""
|
||||
Normalize string for comparison by removing special characters,
|
||||
extra spaces, and converting to lowercase.
|
||||
Args:
|
||||
text (str): The text to normalize
|
||||
Returns:
|
||||
str: Normalized text
|
||||
"""
|
||||
# Remove special characters and convert to lowercase
|
||||
text = regex.sub(r'[^\w\s-]', '', text).lower()
|
||||
# Normalize spaces
|
||||
text = ' '.join(text.split())
|
||||
return text
|
||||
|
||||
def _calculate_token_similarity(self, str1: str, str2: str) -> float:
|
||||
"""
|
||||
Calculate similarity based on matching tokens (words).
|
||||
Args:
|
||||
str1 (str): string 1 to compare
|
||||
str2 (str): string 2 to compare
|
||||
Returns:
|
||||
float: The token similarity score
|
||||
"""
|
||||
tokens1 = set(str1.split())
|
||||
tokens2 = set(str2.split())
|
||||
|
||||
if not tokens1 or not tokens2:
|
||||
return 0.0
|
||||
|
||||
intersection = tokens1.intersection(tokens2)
|
||||
union = tokens1.union(tokens2)
|
||||
|
||||
return len(intersection) / len(union)
|
||||
|
||||
class DataUtils:
|
||||
"""
|
||||
Data Utils
|
||||
"""
|
||||
|
||||
def __init__(self):
|
||||
self.lrc_regex = regex.compile(r'\[([0-9]{2}:[0-9]{2})\.[0-9]{1,3}\](\s(.*)){0,}')
|
||||
|
||||
|
||||
def scrub_lyrics(self, lyrics: str) -> str:
|
||||
"""
|
||||
Lyric Scrub Regex Chain
|
||||
Args:
|
||||
lyrics (str): The lyrics to scrub
|
||||
Returns:
|
||||
str: Regex scrubbed lyrics
|
||||
"""
|
||||
lyrics = regex.sub(r'(\[.*?\])(\s){0,}(\:){0,1}', '', lyrics)
|
||||
lyrics = regex.sub(r'(\d?)(Embed\b)', '', lyrics, flags=regex.IGNORECASE)
|
||||
lyrics = regex.sub(r'\n{2}', '\n', lyrics) # Gaps between verses
|
||||
lyrics = regex.sub(r'[0-9]\b$', '', lyrics)
|
||||
return lyrics
|
||||
|
||||
def create_lrc_object(self, lrc_str: str) -> list[dict]:
|
||||
"""
|
||||
Create LRC Object
|
||||
Args:
|
||||
lrc_str (str): The raw LRCLib syncedLyrics
|
||||
Returns:
|
||||
list[dict]: LRC Object comprised of timestamps/lyrics
|
||||
"""
|
||||
lrc_out: list = []
|
||||
for line in lrc_str.split("\n"):
|
||||
_timetag = None
|
||||
_words = None
|
||||
if not line.strip():
|
||||
continue
|
||||
reg_helper = regex.findall(self.lrc_regex, line.strip())
|
||||
if not reg_helper:
|
||||
continue
|
||||
reg_helper = reg_helper[0]
|
||||
logging.debug("Reg helper: %s for line: %s; len: %s",
|
||||
reg_helper, line, len(reg_helper))
|
||||
_timetag = reg_helper[0]
|
||||
if not reg_helper[1].strip():
|
||||
_words = "♪"
|
||||
else:
|
||||
_words = reg_helper[1].strip()
|
||||
lrc_out.append({
|
||||
"timeTag": _timetag,
|
||||
"words": _words,
|
||||
})
|
||||
return lrc_out
|
Reference in New Issue
Block a user