128 lines
5.9 KiB
Python
Raw Normal View History

2025-01-12 20:19:48 -05:00
#!/usr/bin/env python3.12
2025-01-14 11:10:13 -05:00
# pylint: disable=bare-except, broad-exception-caught, wrong-import-order, wrong-import-position
2025-01-12 20:19:48 -05:00
2025-01-13 20:47:39 -05:00
import sys
sys.path.insert(1,'..')
import traceback
2025-01-14 11:10:13 -05:00
import logging
2025-01-15 20:17:49 -05:00
import time
2025-01-14 14:17:18 -05:00
from typing import Optional
2025-01-13 20:47:39 -05:00
from aiohttp import ClientTimeout, ClientSession
2025-01-14 14:17:18 -05:00
from bs4 import BeautifulSoup, ResultSet
2025-01-13 20:47:39 -05:00
import html as htm
from . import private
2025-01-12 20:19:48 -05:00
from . import common
2025-01-15 20:17:49 -05:00
from . import cache
2025-01-13 20:47:39 -05:00
from lyric_search_new import utils
from lyric_search_new.constructors import LyricsResult
2025-01-15 20:17:49 -05:00
2025-01-14 11:10:13 -05:00
logger = logging.getLogger()
log_level = logging.getLevelName(logger.level)
2025-01-13 20:47:39 -05:00
class InvalidResponseException(Exception):
"""
2025-01-14 11:10:13 -05:00
InvalidResponseException
2025-01-13 20:47:39 -05:00
"""
2025-01-12 20:19:48 -05:00
class Genius:
"""Genius Search Module"""
def __init__(self):
2025-01-14 14:17:18 -05:00
self.label: str = "Genius"
self.genius_url: str = private.GENIUS_URL
self.genius_search_url: str = f'{self.genius_url}api/search/song?q='
self.headers: dict = common.SCRAPE_HEADERS
2025-01-16 07:14:36 -05:00
self.timeout = ClientTimeout(connect=3, sock_read=5)
2025-01-13 20:47:39 -05:00
self.datautils = utils.DataUtils()
self.matcher = utils.TrackMatcher()
2025-01-15 20:17:49 -05:00
self.cache = cache.Cache()
2025-01-12 20:19:48 -05:00
2025-01-16 09:37:50 -05:00
# pylint: disable=unused-argument
2025-01-16 07:14:36 -05:00
async def search(self, artist: str, song: str, **kwargs) -> Optional[LyricsResult]:
2025-01-12 20:19:48 -05:00
"""
@artist: the artist to search
@song: the song to search
2025-01-13 20:47:39 -05:00
"""
try:
2025-01-16 09:37:50 -05:00
# pylint: enable=unused-argument
2025-01-14 14:17:18 -05:00
artist: str = artist.strip().lower()
song: str = song.strip().lower()
2025-01-15 20:17:49 -05:00
time_start: float = time.time()
logging.info("Searching %s - %s on %s",
artist, song, self.label)
2025-01-14 14:17:18 -05:00
search_term: str = f'{artist}%20{song}'
returned_lyrics: str = ''
2025-01-13 20:47:39 -05:00
async with ClientSession() as client:
async with client.get(f'{self.genius_search_url}{search_term}',
timeout=self.timeout,
headers=self.headers) as request:
request.raise_for_status()
2025-01-14 14:17:18 -05:00
text: str|None = await request.text()
2025-01-13 20:47:39 -05:00
if len(text) < 100:
raise InvalidResponseException("Search response text was invalid (len < 100 chars.)")
search_data = await request.json()
if not isinstance(search_data, dict):
raise InvalidResponseException("Invalid JSON.")
if not isinstance(search_data['response'], dict):
raise InvalidResponseException(f"Invalid JSON: Cannot find response key.\n{search_data}")
if not isinstance(search_data['response']['sections'], list):
raise InvalidResponseException(f"Invalid JSON: Cannot find response->sections key.\n{search_data}")
if not isinstance(search_data['response']['sections'][0]['hits'], list):
raise InvalidResponseException("Invalid JSON: Cannot find response->sections[0]->hits key.")
2025-01-14 14:17:18 -05:00
possible_matches: list = search_data['response']['sections'][0]['hits']
to_scrape: list[tuple] = [
2025-01-13 20:47:39 -05:00
(
returned['result']['path'],
f'{returned['result']['artist_names']} - {returned['result']['title']}',
) for returned in possible_matches
]
2025-01-14 14:17:18 -05:00
searched: str = f"{artist} - {song}"
best_match: tuple = self.matcher.find_best_match(input_track=searched,
2025-01-13 20:47:39 -05:00
candidate_tracks=to_scrape)
((scrape_stub, track), confidence) = best_match
2025-01-14 14:17:18 -05:00
scrape_url: str = f'{self.genius_url}{scrape_stub[1:]}'
2025-01-13 20:47:39 -05:00
async with client.get(scrape_url,
timeout=self.timeout,
headers=self.headers) as scrape_request:
scrape_request.raise_for_status()
2025-01-14 14:17:18 -05:00
scrape_text: str|None = await scrape_request.text()
2025-01-13 20:47:39 -05:00
if len(scrape_text) < 100:
raise InvalidResponseException("Scrape response was invalid (len < 100 chars.)")
2025-01-14 14:17:18 -05:00
2025-01-13 20:47:39 -05:00
html = BeautifulSoup(htm.unescape(scrape_text).replace('<br/>', '\n'), "html.parser")
2025-01-14 14:17:18 -05:00
divs: ResultSet|None = html.find_all("div", {"data-lyrics-container": "true"})
2025-01-13 20:47:39 -05:00
if not divs:
return
for div in divs:
returned_lyrics += div.get_text()
2025-01-14 14:17:18 -05:00
returned_lyrics: str = self.datautils.scrub_lyrics(returned_lyrics)
artist: str = track.split(" - ", maxsplit=1)[0]
song: str = track.split(" - ", maxsplit=1)[1]
2025-01-14 11:10:13 -05:00
logging.info("Result found on %s", self.label)
2025-01-15 20:17:49 -05:00
time_end: float = time.time()
time_diff: float = time_end - time_start
matched = LyricsResult(artist=artist,
2025-01-13 20:47:39 -05:00
song=song,
src=self.label,
lyrics=returned_lyrics,
2025-01-15 20:17:49 -05:00
confidence=confidence,
time=time_diff)
await self.cache.store(matched)
return matched
2025-01-13 20:47:39 -05:00
except:
2025-01-15 20:17:49 -05:00
# if log_level == "DEBUG":
traceback.print_exc()
2025-01-16 09:21:50 -05:00
return