This commit is contained in:
codey 2025-01-14 14:17:18 -05:00
parent d782451104
commit 06581c1fce
4 changed files with 56 additions and 53 deletions

View File

@ -18,7 +18,7 @@ class Aggregate:
def __init__(self, exclude_methods=None): def __init__(self, exclude_methods=None):
if not exclude_methods: if not exclude_methods:
exclude_methods = [] exclude_methods: list = []
self.exclude_methods = exclude_methods self.exclude_methods = exclude_methods
async def search(self, artist: str, song: str) -> Optional[LyricsResult]: async def search(self, artist: str, song: str) -> Optional[LyricsResult]:
@ -27,10 +27,10 @@ class Aggregate:
cache_search = cache.Cache() cache_search = cache.Cache()
genius_search = genius.Genius() genius_search = genius.Genius()
lrclib_search = lrclib.LRCLib() lrclib_search = lrclib.LRCLib()
sources = [cache_search, sources: list = [cache_search,
lrclib_search, lrclib_search,
genius_search] genius_search]
search_result = None search_result: Optional[LyricsResult] = None
for source in sources: for source in sources:
if source.label.lower() in self.exclude_methods: if source.label.lower() in self.exclude_methods:
logging.info("Skipping source: %s, excluded.", source.label) logging.info("Skipping source: %s, excluded.", source.label)

View File

@ -18,17 +18,17 @@ log_level = logging.getLevelName(logger.level)
class Cache: class Cache:
"""Cache Search Module""" """Cache Search Module"""
def __init__(self): def __init__(self):
self.cache_db = os.path.join("/", "var", self.cache_db: str = os.path.join("/", "var",
"lib", "singerdbs", "lib", "singerdbs",
"cached_lyrics.db") "cached_lyrics.db")
self.cache_pre_query = "pragma journal_mode = WAL; pragma synchronous = normal; pragma temp_store = memory; pragma mmap_size = 30000000000;" self.cache_pre_query: str = "pragma journal_mode = WAL; pragma synchronous = normal; pragma temp_store = memory; pragma mmap_size = 30000000000;"
self.sqlite_exts = ['/usr/local/lib/python3.11/dist-packages/spellfix1.cpython-311-x86_64-linux-gnu.so'] self.sqlite_exts: list[str] = ['/usr/local/lib/python3.11/dist-packages/spellfix1.cpython-311-x86_64-linux-gnu.so']
self.label = "Cache" self.label: str = "Cache"
def get_matched(self, sqlite_rows, matched_candidate, confidence) -> Optional[LyricsResult]: def get_matched(self, sqlite_rows: list[sqlite3.Row], matched_candidate: tuple, confidence: float) -> Optional[LyricsResult]:
"""Get Matched Result""" """Get Matched Result"""
matched_id = matched_candidate[0] matched_id: int = matched_candidate[0]
for row in sqlite_rows: for row in sqlite_rows:
if row[0] == matched_id: if row[0] == matched_id:
(_id, artist, song, lyrics, original_src, _confidence) = row (_id, artist, song, lyrics, original_src, _confidence) = row
@ -40,7 +40,7 @@ class Cache:
confidence=confidence) confidence=confidence)
return None return None
async def search(self, artist: str, song: str): async def search(self, artist: str, song: str) -> Optional[LyricsResult]:
""" """
@artist: the artist to search @artist: the artist to search
@song: the song to search @song: the song to search
@ -48,8 +48,8 @@ class Cache:
- LyricsResult corresponding to nearest match found (if found), **None** otherwise - LyricsResult corresponding to nearest match found (if found), **None** otherwise
""" """
try: try:
artist = artist.strip().lower() artist: str = artist.strip().lower()
song = song.strip().lower() song: str = song.strip().lower()
logging.info("Searching %s - %s on %s", logging.info("Searching %s - %s on %s",
artist, song, self.label) artist, song, self.label)
async with sqlite3.connect(self.cache_db, timeout=2) as db_conn: async with sqlite3.connect(self.cache_db, timeout=2) as db_conn:
@ -57,20 +57,20 @@ class Cache:
for ext in self.sqlite_exts: for ext in self.sqlite_exts:
await db_conn.load_extension(ext) await db_conn.load_extension(ext)
async with await db_conn.executescript(self.cache_pre_query) as _db_cursor: async with await db_conn.executescript(self.cache_pre_query) as _db_cursor:
search_query = 'SELECT id, artist, song, lyrics, src, confidence FROM lyrics\ search_query: str = 'SELECT id, artist, song, lyrics, src, confidence FROM lyrics\
WHERE editdist3((artist || " " || song), (? || " " || ?))\ WHERE editdist3((artist || " " || song), (? || " " || ?))\
<= 410 ORDER BY editdist3((artist || " " || song), ?) ASC LIMIT 10' <= 410 ORDER BY editdist3((artist || " " || song), ?) ASC LIMIT 10'
search_params = (artist.strip(), song.strip(), search_params: tuple = (artist.strip(), song.strip(),
f"{artist.strip()} {song.strip()}") f"{artist.strip()} {song.strip()}")
async with await _db_cursor.execute(search_query, search_params) as db_cursor: async with await _db_cursor.execute(search_query, search_params) as db_cursor:
results = await db_cursor.fetchall() results: list = await db_cursor.fetchall()
result_tracks = [] result_tracks: list = []
for track in results: for track in results:
(_id, _artist, _song, _lyrics, _src, _confidence) = track (_id, _artist, _song, _lyrics, _src, _confidence) = track
result_tracks.append((_id, f"{_artist} - {_song}")) result_tracks.append((_id, f"{_artist} - {_song}"))
input_track = f"{artist} - {song}" input_track: str = f"{artist} - {song}"
matcher = utils.TrackMatcher() matcher = utils.TrackMatcher()
best_match = matcher.find_best_match(input_track=input_track, best_match: tuple|None = matcher.find_best_match(input_track=input_track,
candidate_tracks=result_tracks) candidate_tracks=result_tracks)
if not best_match: if not best_match:
return None return None

View File

@ -5,8 +5,9 @@ import sys
sys.path.insert(1,'..') sys.path.insert(1,'..')
import traceback import traceback
import logging import logging
from typing import Optional
from aiohttp import ClientTimeout, ClientSession from aiohttp import ClientTimeout, ClientSession
from bs4 import BeautifulSoup from bs4 import BeautifulSoup, ResultSet
import html as htm import html as htm
from . import private from . import private
from . import common from . import common
@ -24,32 +25,32 @@ class InvalidResponseException(Exception):
class Genius: class Genius:
"""Genius Search Module""" """Genius Search Module"""
def __init__(self): def __init__(self):
self.label = "Genius" self.label: str = "Genius"
self.genius_url = private.GENIUS_URL self.genius_url: str = private.GENIUS_URL
self.genius_search_url = f'{self.genius_url}api/search/song?q=' self.genius_search_url: str = f'{self.genius_url}api/search/song?q='
self.headers = common.SCRAPE_HEADERS self.headers: dict = common.SCRAPE_HEADERS
self.timeout = ClientTimeout(connect=2, sock_read=4) self.timeout = ClientTimeout(connect=2, sock_read=4)
self.datautils = utils.DataUtils() self.datautils = utils.DataUtils()
self.matcher = utils.TrackMatcher() self.matcher = utils.TrackMatcher()
async def search(self, artist: str, song: str): async def search(self, artist: str, song: str) -> Optional[LyricsResult]:
""" """
@artist: the artist to search @artist: the artist to search
@song: the song to search @song: the song to search
""" """
try: try:
artist = artist.strip().lower() artist: str = artist.strip().lower()
song = song.strip().lower() song: str = song.strip().lower()
logging.info("Searching %s - %s on %s", logging.info("Searching %s - %s on %s",
artist, song, self.label) artist, song, self.label)
search_term = f'{artist}%20{song}' search_term: str = f'{artist}%20{song}'
returned_lyrics = '' returned_lyrics: str = ''
async with ClientSession() as client: async with ClientSession() as client:
async with client.get(f'{self.genius_search_url}{search_term}', async with client.get(f'{self.genius_search_url}{search_term}',
timeout=self.timeout, timeout=self.timeout,
headers=self.headers) as request: headers=self.headers) as request:
request.raise_for_status() request.raise_for_status()
text = await request.text() text: str|None = await request.text()
if len(text) < 100: if len(text) < 100:
raise InvalidResponseException("Search response text was invalid (len < 100 chars.)") raise InvalidResponseException("Search response text was invalid (len < 100 chars.)")
@ -67,30 +68,31 @@ class Genius:
if not isinstance(search_data['response']['sections'][0]['hits'], list): if not isinstance(search_data['response']['sections'][0]['hits'], list):
raise InvalidResponseException("Invalid JSON: Cannot find response->sections[0]->hits key.") raise InvalidResponseException("Invalid JSON: Cannot find response->sections[0]->hits key.")
possible_matches = search_data['response']['sections'][0]['hits'] possible_matches: list = search_data['response']['sections'][0]['hits']
to_scrape = [ to_scrape: list[tuple] = [
( (
returned['result']['path'], returned['result']['path'],
f'{returned['result']['artist_names']} - {returned['result']['title']}', f'{returned['result']['artist_names']} - {returned['result']['title']}',
) for returned in possible_matches ) for returned in possible_matches
] ]
searched = f"{artist} - {song}" searched: str = f"{artist} - {song}"
best_match = self.matcher.find_best_match(input_track=searched, best_match: tuple = self.matcher.find_best_match(input_track=searched,
candidate_tracks=to_scrape) candidate_tracks=to_scrape)
((scrape_stub, track), confidence) = best_match ((scrape_stub, track), confidence) = best_match
scrape_url = f'{self.genius_url}{scrape_stub[1:]}' scrape_url: str = f'{self.genius_url}{scrape_stub[1:]}'
async with client.get(scrape_url, async with client.get(scrape_url,
timeout=self.timeout, timeout=self.timeout,
headers=self.headers) as scrape_request: headers=self.headers) as scrape_request:
scrape_request.raise_for_status() scrape_request.raise_for_status()
scrape_text = await scrape_request.text() scrape_text: str|None = await scrape_request.text()
if len(scrape_text) < 100: if len(scrape_text) < 100:
raise InvalidResponseException("Scrape response was invalid (len < 100 chars.)") raise InvalidResponseException("Scrape response was invalid (len < 100 chars.)")
html = BeautifulSoup(htm.unescape(scrape_text).replace('<br/>', '\n'), "html.parser") html = BeautifulSoup(htm.unescape(scrape_text).replace('<br/>', '\n'), "html.parser")
divs = html.find_all("div", {"data-lyrics-container": "true"}) divs: ResultSet|None = html.find_all("div", {"data-lyrics-container": "true"})
if not divs: if not divs:
return return
@ -98,9 +100,9 @@ class Genius:
for div in divs: for div in divs:
returned_lyrics += div.get_text() returned_lyrics += div.get_text()
returned_lyrics = self.datautils.scrub_lyrics(returned_lyrics) returned_lyrics: str = self.datautils.scrub_lyrics(returned_lyrics)
artist = track.split(" - ", maxsplit=1)[0] artist: str = track.split(" - ", maxsplit=1)[0]
song = track.split(" - ", maxsplit=1)[1] song: str = track.split(" - ", maxsplit=1)[1]
logging.info("Result found on %s", self.label) logging.info("Result found on %s", self.label)
return LyricsResult(artist=artist, return LyricsResult(artist=artist,
song=song, song=song,

View File

@ -5,6 +5,7 @@ import sys
sys.path.insert(1,'..') sys.path.insert(1,'..')
import traceback import traceback
import logging import logging
from typing import Optional
from aiohttp import ClientTimeout, ClientSession from aiohttp import ClientTimeout, ClientSession
from lyric_search_new import utils from lyric_search_new import utils
from lyric_search_new.constructors import LyricsResult from lyric_search_new.constructors import LyricsResult
@ -21,24 +22,24 @@ class InvalidResponseException(Exception):
class LRCLib: class LRCLib:
"""LRCLib Search Module""" """LRCLib Search Module"""
def __init__(self): def __init__(self):
self.label = "LRCLib" self.label: str = "LRCLib"
self.lrclib_url = "https://lrclib.net/api/get" self.lrclib_url: str = "https://lrclib.net/api/get"
self.headers = common.SCRAPE_HEADERS self.headers: dict = common.SCRAPE_HEADERS
self.timeout = ClientTimeout(connect=2, sock_read=4) self.timeout = ClientTimeout(connect=2, sock_read=4)
self.datautils = utils.DataUtils() self.datautils = utils.DataUtils()
self.matcher = utils.TrackMatcher() self.matcher = utils.TrackMatcher()
async def search(self, artist: str, song: str): async def search(self, artist: str, song: str) -> Optional[LyricsResult]:
""" """
@artist: the artist to search @artist: the artist to search
@song: the song to search @song: the song to search
""" """
try: try:
artist = artist.strip().lower() artist: str = artist.strip().lower()
song = song.strip().lower() song: str = song.strip().lower()
logging.info("Searching %s - %s on %s", logging.info("Searching %s - %s on %s",
artist, song, self.label) artist, song, self.label)
returned_lyrics = '' returned_lyrics: str = ''
async with ClientSession() as client: async with ClientSession() as client:
async with client.get(self.lrclib_url, async with client.get(self.lrclib_url,
params = { params = {
@ -48,12 +49,12 @@ class LRCLib:
timeout=self.timeout, timeout=self.timeout,
headers=self.headers) as request: headers=self.headers) as request:
request.raise_for_status() request.raise_for_status()
text = await request.text() text: str|None = await request.text()
if len(text) < 100: if len(text) < 100:
raise InvalidResponseException("Search response text was invalid (len < 100 chars.)") raise InvalidResponseException("Search response text was invalid (len < 100 chars.)")
search_data = await request.json() search_data: dict|None = await request.json()
if not isinstance(search_data, dict): if not isinstance(search_data, dict):
raise InvalidResponseException("Invalid JSON.") raise InvalidResponseException("Invalid JSON.")
@ -64,12 +65,12 @@ class LRCLib:
if not isinstance(search_data['trackName'], str): if not isinstance(search_data['trackName'], str):
raise InvalidResponseException(f"Invalid JSON: Cannot find trackName key.\n{search_data}") raise InvalidResponseException(f"Invalid JSON: Cannot find trackName key.\n{search_data}")
returned_artist = search_data['artistName'] returned_artist: str = search_data['artistName']
returned_song = search_data['trackName'] returned_song: str = search_data['trackName']
returned_lyrics = search_data['plainLyrics'] returned_lyrics: str = search_data['plainLyrics']
returned_lyrics = self.datautils.scrub_lyrics(returned_lyrics) returned_lyrics = self.datautils.scrub_lyrics(returned_lyrics)
input_track = f"{artist} - {song}" input_track: str = f"{artist} - {song}"
returned_track = f"{artist} - {song}" returned_track: str = f"{artist} - {song}"
(_matched, confidence) = self.matcher.find_best_match(input_track=input_track, (_matched, confidence) = self.matcher.find_best_match(input_track=input_track,
candidate_tracks=[(0, returned_track)]) candidate_tracks=[(0, returned_track)])
if not confidence: if not confidence: