diff --git a/endpoints/lyric_search.py b/endpoints/lyric_search.py index 864aed7..290c4de 100644 --- a/endpoints/lyric_search.py +++ b/endpoints/lyric_search.py @@ -9,7 +9,8 @@ import regex import aiohttp from fastapi import FastAPI, HTTPException -from pydantic import BaseModel +from pydantic import BaseModel +from lyric_search_new.sources import aggregate class ValidLyricRequest(BaseModel): @@ -65,7 +66,8 @@ class LyricSearch(FastAPI): self.endpoints = { "lyric_search": self.lyric_search_handler, "lyric_cache_list": self.lyric_cache_list_handler, - "lyric_search_history": self.lyric_search_log_handler + "lyric_search_history": self.lyric_search_log_handler, + "lyric_search_test": self.new_test, } self.acceptable_request_sources = [ @@ -102,7 +104,28 @@ class LyricSearch(FastAPI): 'err': False, 'history': last_10k_sings } + + async def new_test(self, data: ValidLyricRequest): + """ + Search for lyrics (testing) + - **a**: artist + - **s**: song + - **t**: track (artist and song combined) [used only if a & s are not used] [unused] + - **extra**: include extra details in response [optional, default: false] [unused] + - **lrc**: Request LRCs? [unused] + - **sub**: text to search within lyrics, if found lyrics will begin at found verse [optional, default: none] [unused] + - **src**: the script/utility which initiated the request [unused] + """ + + if not data.a or not data.s: + raise HTTPException(detail="Invalid request", status_code=500) + + aggregate_search = aggregate.Aggregate() + result = await aggregate_search.search(data.a, data.s) + return result.dict() + + async def lyric_search_handler(self, data: ValidLyricRequest): """ diff --git a/lyric_search_new/constructors.py b/lyric_search_new/constructors.py new file mode 100644 index 0000000..946f92d --- /dev/null +++ b/lyric_search_new/constructors.py @@ -0,0 +1,16 @@ +#!/usr/bin/env python3.12 + +from dataclasses import dataclass, asdict + +@dataclass +class LyricsResult: + """Class for returned Lyrics Results""" + artist: str + song: str + src: str + lyrics: str + confidence: float + + def dict(self): + """Return as dict""" + return {k: str(v) for k, v in asdict(self).items()} \ No newline at end of file diff --git a/lyric_search_new/sources/__init__.py b/lyric_search_new/sources/__init__.py index ef1cd04..e69de29 100644 --- a/lyric_search_new/sources/__init__.py +++ b/lyric_search_new/sources/__init__.py @@ -1,4 +0,0 @@ -from . import cache -from . import genius -from . import spotify -from . import common \ No newline at end of file diff --git a/lyric_search_new/sources/aggregate.py b/lyric_search_new/sources/aggregate.py new file mode 100644 index 0000000..f50cae4 --- /dev/null +++ b/lyric_search_new/sources/aggregate.py @@ -0,0 +1,31 @@ +#!/usr/bin/env python3.12 +# pylint: disable=wrong-import-order + +from typing import Optional +from lyric_search_new.constructors import LyricsResult +import sys +sys.path.insert(1,'..') +sys.path.insert(1,'..') +from . import cache +from . import genius +class Aggregate: + """Aggregate all source methods""" + + def __init__(self, exclude_methods=None): + if not exclude_methods: + exclude_methods = [] + self.exclude_methods = exclude_methods + + async def search(self, artist: str, song: str) -> Optional[LyricsResult]: + cache_search = cache.Cache() + genius_search = genius.Genius() + search = None + if "cache" not in self.exclude_methods: + # First, try cache + search = await cache_search.search(artist, song) + if not search: + print("Cache: NOT FOUND!") + # Then try Genius + search = await genius_search.search(artist, song) + + return search diff --git a/lyric_search_new/sources/cache.py b/lyric_search_new/sources/cache.py index 2a9998b..8f80baa 100644 --- a/lyric_search_new/sources/cache.py +++ b/lyric_search_new/sources/cache.py @@ -1,6 +1,69 @@ #!/usr/bin/env python3.12 +import os +import sys +sys.path.insert(1,'..') +import aiosqlite as sqlite3 +from typing import Optional +from . import private +from . import common +from lyric_search_new import utils +from lyric_search_new.constructors import LyricsResult + class Cache: """Cache Search Module""" def __init__(self): - pass \ No newline at end of file + self.cache_db = os.path.join("/", "var", + "lib", "singerdbs", + "cached_lyrics.db") + + self.cache_pre_query = "pragma journal_mode = WAL; pragma synchronous = normal; pragma temp_store = memory; pragma mmap_size = 30000000000;" + self.sqlite_exts = ['/usr/local/lib/python3.11/dist-packages/spellfix1.cpython-311-x86_64-linux-gnu.so'] + + def get_matched(self, sqlite_rows, matched_candidate, confidence) -> Optional[LyricsResult]: + matched_id = matched_candidate[0] + for row in sqlite_rows: + if row[0] == matched_id: + (_id, artist, song, lyrics, original_src, _confidence) = row + return LyricsResult( + artist=artist, + song=song, + lyrics=lyrics, + src=f"{original_src} (cached, id: {_id})", + confidence=confidence) + return None + + async def search(self, artist: str, song: str): + """ + @artist: the artist to search + @song: the song to search + Returns: + - LyricsResult corresponding to nearest match found (if found), **None** otherwise + """ + async with sqlite3.connect(self.cache_db, timeout=2) as db_conn: + await db_conn.enable_load_extension(True) + for ext in self.sqlite_exts: + await db_conn.load_extension(ext) + async with await db_conn.executescript(self.cache_pre_query) as _db_cursor: + search_query = 'SELECT id, artist, song, lyrics, src, confidence FROM lyrics WHERE editdist3((artist || " " || song), (? || " " || ?))\ + <= 410 ORDER BY editdist3((artist || " " || song), ?) ASC LIMIT 10' + async with await _db_cursor.execute(search_query, (artist.strip(), song.strip(), f"{artist.strip()} {song.strip()}")) as db_cursor: + results = await db_cursor.fetchall() + result_tracks = [] + for track in results: + (_id, _artist, _song, _lyrics, _src, _confidence) = track + result_tracks.append((_id, f"{_artist} - {_song}")) + input_track = f"{artist} - {song}" + matcher = utils.TrackMatcher() + best_match = matcher.find_best_match(input_track=input_track, + candidate_tracks=result_tracks) + if not best_match: + return None + (candidate, confidence) = best_match + return self.get_matched(sqlite_rows=results, + matched_candidate=candidate, + confidence=confidence) + + + + \ No newline at end of file diff --git a/lyric_search_new/sources/genius.py b/lyric_search_new/sources/genius.py index b096119..89c6324 100644 --- a/lyric_search_new/sources/genius.py +++ b/lyric_search_new/sources/genius.py @@ -1,19 +1,105 @@ #!/usr/bin/env python3.12 +# pylint: disable=bare-except, broad-exception-caught, wrong-import-position -from aiohttp import ClientTimeout, ClientSession, ClientError -from .. import private +import sys +sys.path.insert(1,'..') +import traceback +from aiohttp import ClientTimeout, ClientSession +from bs4 import BeautifulSoup +import html as htm +from . import private from . import common +from lyric_search_new import utils +from lyric_search_new.constructors import LyricsResult + +class InvalidResponseException(Exception): + """ + """ class Genius: """Genius Search Module""" def __init__(self): - self.genius_url = private.genius_url + self.label = "Genius" + self.genius_url = private.GENIUS_URL self.genius_search_url = f'{self.genius_url}api/search/song?q=' self.headers = common.SCRAPE_HEADERS - self.timeout = ClientTimeout(connect=2, sock_read=2.5) + self.timeout = ClientTimeout(connect=2, sock_read=4) + self.datautils = utils.DataUtils() + self.matcher = utils.TrackMatcher() async def search(self, artist: str, song: str): """ @artist: the artist to search @song: the song to search - """ \ No newline at end of file + """ + try: + search_term = f'{artist}%20{song}' + returned_lyrics = '' + async with ClientSession() as client: + async with client.get(f'{self.genius_search_url}{search_term}', + timeout=self.timeout, + headers=self.headers) as request: + request.raise_for_status() + text = await request.text() + + if len(text) < 100: + raise InvalidResponseException("Search response text was invalid (len < 100 chars.)") + search_data = await request.json() + + if not isinstance(search_data, dict): + raise InvalidResponseException("Invalid JSON.") + + if not isinstance(search_data['response'], dict): + raise InvalidResponseException(f"Invalid JSON: Cannot find response key.\n{search_data}") + + if not isinstance(search_data['response']['sections'], list): + raise InvalidResponseException(f"Invalid JSON: Cannot find response->sections key.\n{search_data}") + + if not isinstance(search_data['response']['sections'][0]['hits'], list): + raise InvalidResponseException("Invalid JSON: Cannot find response->sections[0]->hits key.") + + possible_matches = search_data['response']['sections'][0]['hits'] + to_scrape = [ + ( + returned['result']['path'], + f'{returned['result']['artist_names']} - {returned['result']['title']}', + ) for returned in possible_matches + ] + searched = f"{artist} - {song}" + best_match = self.matcher.find_best_match(input_track=searched, + candidate_tracks=to_scrape) + ((scrape_stub, track), confidence) = best_match + scrape_url = f'{self.genius_url}{scrape_stub[1:]}' + + async with client.get(scrape_url, + timeout=self.timeout, + headers=self.headers) as scrape_request: + scrape_request.raise_for_status() + scrape_text = await scrape_request.text() + + if len(scrape_text) < 100: + raise InvalidResponseException("Scrape response was invalid (len < 100 chars.)") + + html = BeautifulSoup(htm.unescape(scrape_text).replace('
', '\n'), "html.parser") + divs = html.find_all("div", {"data-lyrics-container": "true"}) + + if not divs: + return + + for div in divs: + returned_lyrics += div.get_text() + + returned_lyrics = self.datautils.scrub_lyrics(returned_lyrics) + artist = track.split(" - ", maxsplit=1)[0] + song = track.split(" - ", maxsplit=1)[1] + return LyricsResult(artist=artist, + song=song, + src=self.label, + lyrics=returned_lyrics, + confidence=confidence) + + except: + traceback.print_exc() + return + + diff --git a/lyric_search_new/tests.py b/lyric_search_new/tests.py new file mode 100644 index 0000000..bb8ef41 --- /dev/null +++ b/lyric_search_new/tests.py @@ -0,0 +1,44 @@ +#!/usr/bin/env python3.12 +# tests + +import asyncio +import sys +import sources.cache, sources.genius, sources.aggregate +import utils + +test_artist = "hopsin" +test_song = "ill mind of hopsin 5" + +async def test_cache(artist, song): + cache = sources.cache.Cache() + result = await cache.search(artist, song) + if not result: + print(f"Could not find {artist} - {song}!") + return + print(result.dict()) + + # print(f"artist: {ret_artist}\nsong: {ret_song}:\n{ret_lyr}") + # print(result) + +async def test_genius(artist=None, song=None): + if not artist or not song: + artist = test_artist + song = test_song + genius = sources.genius.Genius() + result = await genius.search(artist, song) + print(result) + +async def test_aggregate(artist=None, song=None): + if not artist or not song: + artist = test_artist + song = test_song + aggregate = sources.aggregate.Aggregate() + result = await aggregate.search(artist, song) + print(result.dict()) + + + +loop = asyncio.new_event_loop() +loop.run_until_complete(test_genius()) +loop.run_until_complete(test_cache(artist=test_artist, song=test_song)) +loop.run_until_complete(test_aggregate()) diff --git a/lyric_search_new/utils.py b/lyric_search_new/utils.py index 3622f6e..f439fc1 100644 --- a/lyric_search_new/utils.py +++ b/lyric_search_new/utils.py @@ -2,7 +2,7 @@ from difflib import SequenceMatcher from typing import List, Optional, Tuple -import re +import regex class TrackMatcher: """Track Matcher""" @@ -16,16 +16,16 @@ class TrackMatcher: """ self.threshold = threshold - def find_best_match(self, input_track: str, candidate_tracks: List[str]) -> Optional[Tuple[str, float]]: + def find_best_match(self, input_track: str, candidate_tracks: List[tuple[int|str, str]]) -> Optional[Tuple[str, float]]: """ Find the best matching track from the candidate list. Args: input_track (str): Input track in "ARTIST - SONG" format - candidate_tracks (List[str]): List of candidate tracks in same format + candidate_tracks (List[tuple[int, str]]): List of candidate tracks Returns: - Optional[Tuple[str, float]]: Tuple of (best matching track, similarity score) + Optional[Tuple[int, str, float]]: Tuple of (best matching track, similarity score) or None if no good match found """ if not input_track or not candidate_tracks: @@ -33,12 +33,14 @@ class TrackMatcher: # Normalize input track input_track = self._normalize_string(input_track) + + print(f"input_track: {input_track}") best_match = None best_score = 0 for candidate in candidate_tracks: - normalized_candidate = self._normalize_string(candidate) + normalized_candidate = self._normalize_string(candidate[1]) # Calculate various similarity scores exact_score = 1.0 if input_track == normalized_candidate else 0.0 @@ -61,7 +63,7 @@ class TrackMatcher: extra spaces, and converting to lowercase. """ # Remove special characters and convert to lowercase - text = re.sub(r'[^\w\s-]', '', text.lower()) + text = regex.sub(r'[^\w\s-]', '', text.lower()) # Normalize spaces text = ' '.join(text.split()) return text @@ -79,4 +81,16 @@ class TrackMatcher: intersection = tokens1.intersection(tokens2) union = tokens1.union(tokens2) - return len(intersection) / len(union) \ No newline at end of file + return len(intersection) / len(union) + +class DataUtils: + """ + Data Utils + """ + def scrub_lyrics(self, lyrics: str) -> str: + # Regex chain + lyrics = regex.sub(r'(\[.*?\])(\s){0,}(\:){0,1}', '', lyrics) + lyrics = regex.sub(r'(\d?)(Embed\b)', '', lyrics, flags=regex.IGNORECASE) + lyrics = regex.sub(r'\n{2}', '\n', lyrics) # Gaps between verses + lyrics = regex.sub(r'[0-9]\b$', '', lyrics) + return lyrics