171 lines
7.0 KiB
Python

import sys
sys.path.insert(1, "..")
import traceback
import logging
import time
import re
from typing import Optional
from aiohttp import ClientTimeout, ClientSession
from bs4 import BeautifulSoup, ResultSet # type: ignore
import html as htm
from . import private, common, cache, redis_cache
from lyric_search import utils
from lyric_search.constructors import LyricsResult, InvalidGeniusResponseException
logger = logging.getLogger()
log_level = logging.getLevelName(logger.level)
class Genius:
"""
Genius Search Module
"""
def __init__(self) -> None:
self.label: str = "Genius"
self.genius_url: str = private.GENIUS_URL
self.genius_search_url: str = f"{self.genius_url}api/search/song?q="
self.headers: dict = common.SCRAPE_HEADERS
self.timeout = ClientTimeout(connect=3, sock_read=5)
self.datautils = utils.DataUtils()
self.matcher = utils.TrackMatcher()
self.cache = cache.Cache()
self.redis_cache = redis_cache.RedisCache()
async def search(self, artist: str, song: str, **kwargs) -> Optional[LyricsResult]:
"""
Genius Search
Args:
artist (str): the artist to search
song (str): the song to search
Returns:
Optional[LyricsResult]: The result, if found - None otherwise.
"""
try:
artist: str = artist.strip().lower()
song: str = song.strip().lower()
time_start: float = time.time()
logging.info("Searching %s - %s on %s", artist, song, self.label)
search_term: str = f"{artist}%20{song}"
returned_lyrics: str = ""
async with ClientSession() as client:
async with client.get(
f"{self.genius_search_url}{search_term}",
timeout=self.timeout,
headers=self.headers,
) as request:
request.raise_for_status()
text: Optional[str] = await request.text()
if not text:
raise InvalidGeniusResponseException("No search response.")
if len(text) < 100:
raise InvalidGeniusResponseException(
"Search response text was invalid (len < 100 chars.)"
)
search_data = await request.json()
if not isinstance(search_data, dict):
raise InvalidGeniusResponseException("Invalid JSON.")
if not isinstance(search_data["response"], dict):
raise InvalidGeniusResponseException(
f"Invalid JSON: Cannot find response key.\n{search_data}"
)
if not isinstance(search_data["response"]["sections"], list):
raise InvalidGeniusResponseException(
f"Invalid JSON: Cannot find response->sections key.\n{search_data}"
)
if not isinstance(
search_data["response"]["sections"][0]["hits"], list
):
raise InvalidGeniusResponseException(
"Invalid JSON: Cannot find response->sections[0]->hits key."
)
possible_matches: list = search_data["response"]["sections"][0][
"hits"
]
to_scrape: list[tuple] = [
(
returned["result"]["path"],
f"{returned['result']['artist_names']} - {returned['result']['title']}",
)
for returned in possible_matches
]
searched: str = f"{artist} - {song}"
best_match: tuple = self.matcher.find_best_match(
input_track=searched, candidate_tracks=to_scrape
)
((scrape_stub, track), confidence) = best_match
scrape_url: str = f"{self.genius_url}{scrape_stub[1:]}"
async with client.get(
scrape_url, timeout=self.timeout, headers=self.headers
) as scrape_request:
scrape_request.raise_for_status()
scrape_text: Optional[str] = await scrape_request.text()
if not scrape_text:
raise InvalidGeniusResponseException("No scrape response.")
if len(scrape_text) < 100:
raise InvalidGeniusResponseException(
"Scrape response was invalid (len < 100 chars.)"
)
html = BeautifulSoup(
htm.unescape(scrape_text).replace("<br/>", "\n"),
"html.parser",
)
header_tags_genius: Optional[ResultSet] = html.find_all(
class_=re.compile(r".*Header.*")
)
if header_tags_genius:
for tag in header_tags_genius:
tag.extract()
divs: Optional[ResultSet] = html.find_all(
"div", {"data-lyrics-container": "true"}
)
if not divs:
return
for div in divs:
header_tags: Optional[ResultSet] = div.find_all(
["h1", "h2", "h3", "h4", "h5"]
)
if header_tags:
for tag in header_tags:
tag.extract()
returned_lyrics += div.get_text()
returned_lyrics: str = self.datautils.scrub_lyrics(
returned_lyrics
)
artist: str = track.split(" - ", maxsplit=1)[0]
song: str = track.split(" - ", maxsplit=1)[1]
logging.info("Result found on %s", self.label)
time_end: float = time.time()
time_diff: float = time_end - time_start
matched = LyricsResult(
artist=artist,
song=song,
src=self.label,
lyrics=returned_lyrics,
confidence=confidence,
time=time_diff,
)
await self.redis_cache.increment_found_count(self.label)
await self.cache.store(matched)
return matched
except:
traceback.print_exc()