132 lines
6.0 KiB
Python
132 lines
6.0 KiB
Python
#!/usr/bin/env python3.12
|
|
# pylint: disable=bare-except, broad-exception-caught, wrong-import-order, wrong-import-position
|
|
|
|
import sys
|
|
sys.path.insert(1,'..')
|
|
import traceback
|
|
import logging
|
|
import time
|
|
from typing import Optional
|
|
from aiohttp import ClientTimeout, ClientSession
|
|
from bs4 import BeautifulSoup, ResultSet
|
|
import html as htm
|
|
from . import private
|
|
from . import common
|
|
from . import cache
|
|
from lyric_search_new import utils
|
|
from lyric_search_new.constructors import LyricsResult
|
|
|
|
|
|
logger = logging.getLogger()
|
|
log_level = logging.getLevelName(logger.level)
|
|
|
|
class InvalidResponseException(Exception):
|
|
"""
|
|
InvalidResponseException
|
|
"""
|
|
|
|
class Genius:
|
|
"""Genius Search Module"""
|
|
def __init__(self):
|
|
self.label: str = "Genius"
|
|
self.genius_url: str = private.GENIUS_URL
|
|
self.genius_search_url: str = f'{self.genius_url}api/search/song?q='
|
|
self.headers: dict = common.SCRAPE_HEADERS
|
|
self.timeout = ClientTimeout(connect=3, sock_read=5)
|
|
self.datautils = utils.DataUtils()
|
|
self.matcher = utils.TrackMatcher()
|
|
self.cache = cache.Cache()
|
|
|
|
# pylint: disable=unused-argument
|
|
async def search(self, artist: str, song: str, **kwargs) -> Optional[LyricsResult]:
|
|
"""
|
|
Genius Search
|
|
Args:
|
|
artist (str): the artist to search
|
|
song (str): the song to search
|
|
Returns:
|
|
LyricsResult|None: The result, if found - None otherwise.
|
|
"""
|
|
try:
|
|
# pylint: enable=unused-argument
|
|
artist: str = artist.strip().lower()
|
|
song: str = song.strip().lower()
|
|
time_start: float = time.time()
|
|
logging.info("Searching %s - %s on %s",
|
|
artist, song, self.label)
|
|
search_term: str = f'{artist}%20{song}'
|
|
returned_lyrics: str = ''
|
|
async with ClientSession() as client:
|
|
async with client.get(f'{self.genius_search_url}{search_term}',
|
|
timeout=self.timeout,
|
|
headers=self.headers) as request:
|
|
request.raise_for_status()
|
|
text: str|None = await request.text()
|
|
|
|
if len(text) < 100:
|
|
raise InvalidResponseException("Search response text was invalid (len < 100 chars.)")
|
|
search_data = await request.json()
|
|
|
|
if not isinstance(search_data, dict):
|
|
raise InvalidResponseException("Invalid JSON.")
|
|
|
|
if not isinstance(search_data['response'], dict):
|
|
raise InvalidResponseException(f"Invalid JSON: Cannot find response key.\n{search_data}")
|
|
|
|
if not isinstance(search_data['response']['sections'], list):
|
|
raise InvalidResponseException(f"Invalid JSON: Cannot find response->sections key.\n{search_data}")
|
|
|
|
if not isinstance(search_data['response']['sections'][0]['hits'], list):
|
|
raise InvalidResponseException("Invalid JSON: Cannot find response->sections[0]->hits key.")
|
|
|
|
possible_matches: list = search_data['response']['sections'][0]['hits']
|
|
to_scrape: list[tuple] = [
|
|
(
|
|
returned['result']['path'],
|
|
f'{returned['result']['artist_names']} - {returned['result']['title']}',
|
|
) for returned in possible_matches
|
|
]
|
|
searched: str = f"{artist} - {song}"
|
|
best_match: tuple = self.matcher.find_best_match(input_track=searched,
|
|
candidate_tracks=to_scrape)
|
|
((scrape_stub, track), confidence) = best_match
|
|
scrape_url: str = f'{self.genius_url}{scrape_stub[1:]}'
|
|
|
|
async with client.get(scrape_url,
|
|
timeout=self.timeout,
|
|
headers=self.headers) as scrape_request:
|
|
scrape_request.raise_for_status()
|
|
scrape_text: str|None = await scrape_request.text()
|
|
|
|
if len(scrape_text) < 100:
|
|
raise InvalidResponseException("Scrape response was invalid (len < 100 chars.)")
|
|
|
|
|
|
html = BeautifulSoup(htm.unescape(scrape_text).replace('<br/>', '\n'), "html.parser")
|
|
divs: ResultSet|None = html.find_all("div", {"data-lyrics-container": "true"})
|
|
|
|
if not divs:
|
|
return
|
|
|
|
for div in divs:
|
|
returned_lyrics += div.get_text()
|
|
|
|
returned_lyrics: str = self.datautils.scrub_lyrics(returned_lyrics)
|
|
artist: str = track.split(" - ", maxsplit=1)[0]
|
|
song: str = track.split(" - ", maxsplit=1)[1]
|
|
logging.info("Result found on %s", self.label)
|
|
time_end: float = time.time()
|
|
time_diff: float = time_end - time_start
|
|
matched = LyricsResult(artist=artist,
|
|
song=song,
|
|
src=self.label,
|
|
lyrics=returned_lyrics,
|
|
confidence=confidence,
|
|
time=time_diff)
|
|
await self.cache.store(matched)
|
|
return matched
|
|
|
|
except:
|
|
# if log_level == "DEBUG":
|
|
traceback.print_exc()
|
|
return |