171 lines
7.0 KiB
Python
Raw Permalink Normal View History

2025-01-13 20:47:39 -05:00
import sys
sys.path.insert(1, "..")
2025-01-13 20:47:39 -05:00
import traceback
2025-01-14 11:10:13 -05:00
import logging
2025-01-15 20:17:49 -05:00
import time
import re
2025-01-14 14:17:18 -05:00
from typing import Optional
2025-01-13 20:47:39 -05:00
from aiohttp import ClientTimeout, ClientSession
from bs4 import BeautifulSoup, ResultSet # type: ignore
2025-01-13 20:47:39 -05:00
import html as htm
2025-01-22 06:38:40 -05:00
from . import private, common, cache, redis_cache
from lyric_search import utils
from lyric_search.constructors import LyricsResult, InvalidGeniusResponseException
2025-01-13 20:47:39 -05:00
2025-01-14 11:10:13 -05:00
logger = logging.getLogger()
log_level = logging.getLevelName(logger.level)
2025-01-12 20:19:48 -05:00
class Genius:
2025-02-15 21:09:33 -05:00
"""
Genius Search Module
"""
2025-01-19 07:09:05 -05:00
def __init__(self) -> None:
2025-01-14 14:17:18 -05:00
self.label: str = "Genius"
self.genius_url: str = private.GENIUS_URL
self.genius_search_url: str = f"{self.genius_url}api/search/song?q="
2025-01-14 14:17:18 -05:00
self.headers: dict = common.SCRAPE_HEADERS
2025-01-16 07:14:36 -05:00
self.timeout = ClientTimeout(connect=3, sock_read=5)
2025-01-13 20:47:39 -05:00
self.datautils = utils.DataUtils()
self.matcher = utils.TrackMatcher()
2025-01-15 20:17:49 -05:00
self.cache = cache.Cache()
2025-01-22 06:38:40 -05:00
self.redis_cache = redis_cache.RedisCache()
2025-01-12 20:19:48 -05:00
async def search(self, artist: str, song: str, **kwargs) -> Optional[LyricsResult]:
2025-01-12 20:19:48 -05:00
"""
2025-01-19 07:01:07 -05:00
Genius Search
Args:
artist (str): the artist to search
song (str): the song to search
Returns:
2025-02-15 21:09:33 -05:00
Optional[LyricsResult]: The result, if found - None otherwise.
2025-01-13 20:47:39 -05:00
"""
try:
2025-01-14 14:17:18 -05:00
artist: str = artist.strip().lower()
song: str = song.strip().lower()
2025-01-15 20:17:49 -05:00
time_start: float = time.time()
logging.info("Searching %s - %s on %s", artist, song, self.label)
search_term: str = f"{artist}%20{song}"
returned_lyrics: str = ""
2025-01-13 20:47:39 -05:00
async with ClientSession() as client:
async with client.get(
f"{self.genius_search_url}{search_term}",
timeout=self.timeout,
headers=self.headers,
) as request:
2025-01-13 20:47:39 -05:00
request.raise_for_status()
2025-02-15 21:09:33 -05:00
text: Optional[str] = await request.text()
2025-02-15 21:09:33 -05:00
if not text:
2025-02-18 14:56:24 -05:00
raise InvalidGeniusResponseException("No search response.")
2025-01-13 20:47:39 -05:00
if len(text) < 100:
raise InvalidGeniusResponseException(
"Search response text was invalid (len < 100 chars.)"
)
2025-01-13 20:47:39 -05:00
search_data = await request.json()
2025-01-13 20:47:39 -05:00
if not isinstance(search_data, dict):
2025-02-18 14:56:24 -05:00
raise InvalidGeniusResponseException("Invalid JSON.")
if not isinstance(search_data["response"], dict):
raise InvalidGeniusResponseException(
f"Invalid JSON: Cannot find response key.\n{search_data}"
)
if not isinstance(search_data["response"]["sections"], list):
raise InvalidGeniusResponseException(
f"Invalid JSON: Cannot find response->sections key.\n{search_data}"
)
if not isinstance(
search_data["response"]["sections"][0]["hits"], list
):
raise InvalidGeniusResponseException(
"Invalid JSON: Cannot find response->sections[0]->hits key."
)
possible_matches: list = search_data["response"]["sections"][0][
"hits"
]
2025-01-14 14:17:18 -05:00
to_scrape: list[tuple] = [
2025-01-13 20:47:39 -05:00
(
returned["result"]["path"],
f"{returned['result']['artist_names']} - {returned['result']['title']}",
)
for returned in possible_matches
2025-01-13 20:47:39 -05:00
]
2025-01-14 14:17:18 -05:00
searched: str = f"{artist} - {song}"
best_match: tuple = self.matcher.find_best_match(
input_track=searched, candidate_tracks=to_scrape
)
2025-01-13 20:47:39 -05:00
((scrape_stub, track), confidence) = best_match
scrape_url: str = f"{self.genius_url}{scrape_stub[1:]}"
async with client.get(
scrape_url, timeout=self.timeout, headers=self.headers
) as scrape_request:
2025-01-13 20:47:39 -05:00
scrape_request.raise_for_status()
2025-02-15 21:09:33 -05:00
scrape_text: Optional[str] = await scrape_request.text()
2025-02-15 21:09:33 -05:00
if not scrape_text:
2025-02-18 14:56:24 -05:00
raise InvalidGeniusResponseException("No scrape response.")
2025-01-13 20:47:39 -05:00
if len(scrape_text) < 100:
raise InvalidGeniusResponseException(
"Scrape response was invalid (len < 100 chars.)"
)
html = BeautifulSoup(
htm.unescape(scrape_text).replace("<br/>", "\n"),
"html.parser",
)
header_tags_genius: Optional[ResultSet] = html.find_all(
class_=re.compile(r".*Header.*")
)
if header_tags_genius:
for tag in header_tags_genius:
tag.extract()
divs: Optional[ResultSet] = html.find_all(
"div", {"data-lyrics-container": "true"}
)
2025-01-13 20:47:39 -05:00
if not divs:
return
2025-01-13 20:47:39 -05:00
for div in divs:
header_tags: Optional[ResultSet] = div.find_all(
["h1", "h2", "h3", "h4", "h5"]
)
if header_tags:
for tag in header_tags:
tag.extract()
2025-01-13 20:47:39 -05:00
returned_lyrics += div.get_text()
returned_lyrics: str = self.datautils.scrub_lyrics(
returned_lyrics
)
2025-01-14 14:17:18 -05:00
artist: str = track.split(" - ", maxsplit=1)[0]
song: str = track.split(" - ", maxsplit=1)[1]
2025-01-14 11:10:13 -05:00
logging.info("Result found on %s", self.label)
2025-01-15 20:17:49 -05:00
time_end: float = time.time()
time_diff: float = time_end - time_start
matched = LyricsResult(
artist=artist,
song=song,
src=self.label,
lyrics=returned_lyrics,
confidence=confidence,
time=time_diff,
)
2025-01-22 06:38:40 -05:00
await self.redis_cache.increment_found_count(self.label)
2025-01-15 20:17:49 -05:00
await self.cache.store(matched)
return matched
2025-01-13 20:47:39 -05:00
except:
traceback.print_exc()