#!/usr/bin/env python3.12 # pylint: disable=bare-except, broad-exception-caught, wrong-import-order, wrong-import-position import sys sys.path.insert(1,'..') import traceback import logging import time from typing import Optional from aiohttp import ClientTimeout, ClientSession from bs4 import BeautifulSoup, ResultSet import html as htm from . import private, common, cache, redis_cache from lyric_search import utils from lyric_search.constructors import LyricsResult logger = logging.getLogger() log_level = logging.getLevelName(logger.level) class InvalidResponseException(Exception): """ InvalidResponseException """ class Genius: """Genius Search Module""" def __init__(self) -> None: self.label: str = "Genius" self.genius_url: str = private.GENIUS_URL self.genius_search_url: str = f'{self.genius_url}api/search/song?q=' self.headers: dict = common.SCRAPE_HEADERS self.timeout = ClientTimeout(connect=3, sock_read=5) self.datautils = utils.DataUtils() self.matcher = utils.TrackMatcher() self.cache = cache.Cache() self.redis_cache = redis_cache.RedisCache() # pylint: disable=unused-argument async def search(self, artist: str, song: str, **kwargs) -> Optional[LyricsResult]: """ Genius Search Args: artist (str): the artist to search song (str): the song to search Returns: LyricsResult|None: The result, if found - None otherwise. """ try: # pylint: enable=unused-argument artist: str = artist.strip().lower() song: str = song.strip().lower() time_start: float = time.time() logging.info("Searching %s - %s on %s", artist, song, self.label) search_term: str = f'{artist}%20{song}' returned_lyrics: str = '' async with ClientSession() as client: async with client.get(f'{self.genius_search_url}{search_term}', timeout=self.timeout, headers=self.headers) as request: request.raise_for_status() text: str|None = await request.text() if len(text) < 100: raise InvalidResponseException("Search response text was invalid (len < 100 chars.)") search_data = await request.json() if not isinstance(search_data, dict): raise InvalidResponseException("Invalid JSON.") if not isinstance(search_data['response'], dict): raise InvalidResponseException(f"Invalid JSON: Cannot find response key.\n{search_data}") if not isinstance(search_data['response']['sections'], list): raise InvalidResponseException(f"Invalid JSON: Cannot find response->sections key.\n{search_data}") if not isinstance(search_data['response']['sections'][0]['hits'], list): raise InvalidResponseException("Invalid JSON: Cannot find response->sections[0]->hits key.") possible_matches: list = search_data['response']['sections'][0]['hits'] to_scrape: list[tuple] = [ ( returned['result']['path'], f'{returned['result']['artist_names']} - {returned['result']['title']}', ) for returned in possible_matches ] searched: str = f"{artist} - {song}" best_match: tuple = self.matcher.find_best_match(input_track=searched, candidate_tracks=to_scrape) ((scrape_stub, track), confidence) = best_match scrape_url: str = f'{self.genius_url}{scrape_stub[1:]}' async with client.get(scrape_url, timeout=self.timeout, headers=self.headers) as scrape_request: scrape_request.raise_for_status() scrape_text: str|None = await scrape_request.text() if len(scrape_text) < 100: raise InvalidResponseException("Scrape response was invalid (len < 100 chars.)") html = BeautifulSoup(htm.unescape(scrape_text).replace('
', '\n'), "html.parser") divs: ResultSet|None = html.find_all("div", {"data-lyrics-container": "true"}) if not divs: return for div in divs: returned_lyrics += div.get_text() returned_lyrics: str = self.datautils.scrub_lyrics(returned_lyrics) artist: str = track.split(" - ", maxsplit=1)[0] song: str = track.split(" - ", maxsplit=1)[1] logging.info("Result found on %s", self.label) time_end: float = time.time() time_diff: float = time_end - time_start matched = LyricsResult(artist=artist, song=song, src=self.label, lyrics=returned_lyrics, confidence=confidence, time=time_diff) await self.redis_cache.increment_found_count(self.label) await self.cache.store(matched) return matched except: # if log_level == "DEBUG": traceback.print_exc() return