progress
This commit is contained in:
parent
945a3d9bf6
commit
86946f0316
@ -10,6 +10,7 @@ import aiohttp
|
||||
|
||||
from fastapi import FastAPI, HTTPException
|
||||
from pydantic import BaseModel
|
||||
from lyric_search_new.sources import aggregate
|
||||
|
||||
|
||||
class ValidLyricRequest(BaseModel):
|
||||
@ -65,7 +66,8 @@ class LyricSearch(FastAPI):
|
||||
self.endpoints = {
|
||||
"lyric_search": self.lyric_search_handler,
|
||||
"lyric_cache_list": self.lyric_cache_list_handler,
|
||||
"lyric_search_history": self.lyric_search_log_handler
|
||||
"lyric_search_history": self.lyric_search_log_handler,
|
||||
"lyric_search_test": self.new_test,
|
||||
}
|
||||
|
||||
self.acceptable_request_sources = [
|
||||
@ -103,6 +105,27 @@ class LyricSearch(FastAPI):
|
||||
'history': last_10k_sings
|
||||
}
|
||||
|
||||
async def new_test(self, data: ValidLyricRequest):
|
||||
"""
|
||||
Search for lyrics (testing)
|
||||
|
||||
- **a**: artist
|
||||
- **s**: song
|
||||
- **t**: track (artist and song combined) [used only if a & s are not used] [unused]
|
||||
- **extra**: include extra details in response [optional, default: false] [unused]
|
||||
- **lrc**: Request LRCs? [unused]
|
||||
- **sub**: text to search within lyrics, if found lyrics will begin at found verse [optional, default: none] [unused]
|
||||
- **src**: the script/utility which initiated the request [unused]
|
||||
"""
|
||||
|
||||
if not data.a or not data.s:
|
||||
raise HTTPException(detail="Invalid request", status_code=500)
|
||||
|
||||
aggregate_search = aggregate.Aggregate()
|
||||
result = await aggregate_search.search(data.a, data.s)
|
||||
return result.dict()
|
||||
|
||||
|
||||
|
||||
async def lyric_search_handler(self, data: ValidLyricRequest):
|
||||
"""
|
||||
|
16
lyric_search_new/constructors.py
Normal file
16
lyric_search_new/constructors.py
Normal file
@ -0,0 +1,16 @@
|
||||
#!/usr/bin/env python3.12
|
||||
|
||||
from dataclasses import dataclass, asdict
|
||||
|
||||
@dataclass
|
||||
class LyricsResult:
|
||||
"""Class for returned Lyrics Results"""
|
||||
artist: str
|
||||
song: str
|
||||
src: str
|
||||
lyrics: str
|
||||
confidence: float
|
||||
|
||||
def dict(self):
|
||||
"""Return as dict"""
|
||||
return {k: str(v) for k, v in asdict(self).items()}
|
@ -1,4 +0,0 @@
|
||||
from . import cache
|
||||
from . import genius
|
||||
from . import spotify
|
||||
from . import common
|
31
lyric_search_new/sources/aggregate.py
Normal file
31
lyric_search_new/sources/aggregate.py
Normal file
@ -0,0 +1,31 @@
|
||||
#!/usr/bin/env python3.12
|
||||
# pylint: disable=wrong-import-order
|
||||
|
||||
from typing import Optional
|
||||
from lyric_search_new.constructors import LyricsResult
|
||||
import sys
|
||||
sys.path.insert(1,'..')
|
||||
sys.path.insert(1,'..')
|
||||
from . import cache
|
||||
from . import genius
|
||||
class Aggregate:
|
||||
"""Aggregate all source methods"""
|
||||
|
||||
def __init__(self, exclude_methods=None):
|
||||
if not exclude_methods:
|
||||
exclude_methods = []
|
||||
self.exclude_methods = exclude_methods
|
||||
|
||||
async def search(self, artist: str, song: str) -> Optional[LyricsResult]:
|
||||
cache_search = cache.Cache()
|
||||
genius_search = genius.Genius()
|
||||
search = None
|
||||
if "cache" not in self.exclude_methods:
|
||||
# First, try cache
|
||||
search = await cache_search.search(artist, song)
|
||||
if not search:
|
||||
print("Cache: NOT FOUND!")
|
||||
# Then try Genius
|
||||
search = await genius_search.search(artist, song)
|
||||
|
||||
return search
|
@ -1,6 +1,69 @@
|
||||
#!/usr/bin/env python3.12
|
||||
|
||||
import os
|
||||
import sys
|
||||
sys.path.insert(1,'..')
|
||||
import aiosqlite as sqlite3
|
||||
from typing import Optional
|
||||
from . import private
|
||||
from . import common
|
||||
from lyric_search_new import utils
|
||||
from lyric_search_new.constructors import LyricsResult
|
||||
|
||||
class Cache:
|
||||
"""Cache Search Module"""
|
||||
def __init__(self):
|
||||
pass
|
||||
self.cache_db = os.path.join("/", "var",
|
||||
"lib", "singerdbs",
|
||||
"cached_lyrics.db")
|
||||
|
||||
self.cache_pre_query = "pragma journal_mode = WAL; pragma synchronous = normal; pragma temp_store = memory; pragma mmap_size = 30000000000;"
|
||||
self.sqlite_exts = ['/usr/local/lib/python3.11/dist-packages/spellfix1.cpython-311-x86_64-linux-gnu.so']
|
||||
|
||||
def get_matched(self, sqlite_rows, matched_candidate, confidence) -> Optional[LyricsResult]:
|
||||
matched_id = matched_candidate[0]
|
||||
for row in sqlite_rows:
|
||||
if row[0] == matched_id:
|
||||
(_id, artist, song, lyrics, original_src, _confidence) = row
|
||||
return LyricsResult(
|
||||
artist=artist,
|
||||
song=song,
|
||||
lyrics=lyrics,
|
||||
src=f"{original_src} (cached, id: {_id})",
|
||||
confidence=confidence)
|
||||
return None
|
||||
|
||||
async def search(self, artist: str, song: str):
|
||||
"""
|
||||
@artist: the artist to search
|
||||
@song: the song to search
|
||||
Returns:
|
||||
- LyricsResult corresponding to nearest match found (if found), **None** otherwise
|
||||
"""
|
||||
async with sqlite3.connect(self.cache_db, timeout=2) as db_conn:
|
||||
await db_conn.enable_load_extension(True)
|
||||
for ext in self.sqlite_exts:
|
||||
await db_conn.load_extension(ext)
|
||||
async with await db_conn.executescript(self.cache_pre_query) as _db_cursor:
|
||||
search_query = 'SELECT id, artist, song, lyrics, src, confidence FROM lyrics WHERE editdist3((artist || " " || song), (? || " " || ?))\
|
||||
<= 410 ORDER BY editdist3((artist || " " || song), ?) ASC LIMIT 10'
|
||||
async with await _db_cursor.execute(search_query, (artist.strip(), song.strip(), f"{artist.strip()} {song.strip()}")) as db_cursor:
|
||||
results = await db_cursor.fetchall()
|
||||
result_tracks = []
|
||||
for track in results:
|
||||
(_id, _artist, _song, _lyrics, _src, _confidence) = track
|
||||
result_tracks.append((_id, f"{_artist} - {_song}"))
|
||||
input_track = f"{artist} - {song}"
|
||||
matcher = utils.TrackMatcher()
|
||||
best_match = matcher.find_best_match(input_track=input_track,
|
||||
candidate_tracks=result_tracks)
|
||||
if not best_match:
|
||||
return None
|
||||
(candidate, confidence) = best_match
|
||||
return self.get_matched(sqlite_rows=results,
|
||||
matched_candidate=candidate,
|
||||
confidence=confidence)
|
||||
|
||||
|
||||
|
||||
|
@ -1,19 +1,105 @@
|
||||
#!/usr/bin/env python3.12
|
||||
# pylint: disable=bare-except, broad-exception-caught, wrong-import-position
|
||||
|
||||
from aiohttp import ClientTimeout, ClientSession, ClientError
|
||||
from .. import private
|
||||
import sys
|
||||
sys.path.insert(1,'..')
|
||||
import traceback
|
||||
from aiohttp import ClientTimeout, ClientSession
|
||||
from bs4 import BeautifulSoup
|
||||
import html as htm
|
||||
from . import private
|
||||
from . import common
|
||||
from lyric_search_new import utils
|
||||
from lyric_search_new.constructors import LyricsResult
|
||||
|
||||
class InvalidResponseException(Exception):
|
||||
"""
|
||||
"""
|
||||
|
||||
class Genius:
|
||||
"""Genius Search Module"""
|
||||
def __init__(self):
|
||||
self.genius_url = private.genius_url
|
||||
self.label = "Genius"
|
||||
self.genius_url = private.GENIUS_URL
|
||||
self.genius_search_url = f'{self.genius_url}api/search/song?q='
|
||||
self.headers = common.SCRAPE_HEADERS
|
||||
self.timeout = ClientTimeout(connect=2, sock_read=2.5)
|
||||
self.timeout = ClientTimeout(connect=2, sock_read=4)
|
||||
self.datautils = utils.DataUtils()
|
||||
self.matcher = utils.TrackMatcher()
|
||||
|
||||
async def search(self, artist: str, song: str):
|
||||
"""
|
||||
@artist: the artist to search
|
||||
@song: the song to search
|
||||
"""
|
||||
try:
|
||||
search_term = f'{artist}%20{song}'
|
||||
returned_lyrics = ''
|
||||
async with ClientSession() as client:
|
||||
async with client.get(f'{self.genius_search_url}{search_term}',
|
||||
timeout=self.timeout,
|
||||
headers=self.headers) as request:
|
||||
request.raise_for_status()
|
||||
text = await request.text()
|
||||
|
||||
if len(text) < 100:
|
||||
raise InvalidResponseException("Search response text was invalid (len < 100 chars.)")
|
||||
search_data = await request.json()
|
||||
|
||||
if not isinstance(search_data, dict):
|
||||
raise InvalidResponseException("Invalid JSON.")
|
||||
|
||||
if not isinstance(search_data['response'], dict):
|
||||
raise InvalidResponseException(f"Invalid JSON: Cannot find response key.\n{search_data}")
|
||||
|
||||
if not isinstance(search_data['response']['sections'], list):
|
||||
raise InvalidResponseException(f"Invalid JSON: Cannot find response->sections key.\n{search_data}")
|
||||
|
||||
if not isinstance(search_data['response']['sections'][0]['hits'], list):
|
||||
raise InvalidResponseException("Invalid JSON: Cannot find response->sections[0]->hits key.")
|
||||
|
||||
possible_matches = search_data['response']['sections'][0]['hits']
|
||||
to_scrape = [
|
||||
(
|
||||
returned['result']['path'],
|
||||
f'{returned['result']['artist_names']} - {returned['result']['title']}',
|
||||
) for returned in possible_matches
|
||||
]
|
||||
searched = f"{artist} - {song}"
|
||||
best_match = self.matcher.find_best_match(input_track=searched,
|
||||
candidate_tracks=to_scrape)
|
||||
((scrape_stub, track), confidence) = best_match
|
||||
scrape_url = f'{self.genius_url}{scrape_stub[1:]}'
|
||||
|
||||
async with client.get(scrape_url,
|
||||
timeout=self.timeout,
|
||||
headers=self.headers) as scrape_request:
|
||||
scrape_request.raise_for_status()
|
||||
scrape_text = await scrape_request.text()
|
||||
|
||||
if len(scrape_text) < 100:
|
||||
raise InvalidResponseException("Scrape response was invalid (len < 100 chars.)")
|
||||
|
||||
html = BeautifulSoup(htm.unescape(scrape_text).replace('<br/>', '\n'), "html.parser")
|
||||
divs = html.find_all("div", {"data-lyrics-container": "true"})
|
||||
|
||||
if not divs:
|
||||
return
|
||||
|
||||
for div in divs:
|
||||
returned_lyrics += div.get_text()
|
||||
|
||||
returned_lyrics = self.datautils.scrub_lyrics(returned_lyrics)
|
||||
artist = track.split(" - ", maxsplit=1)[0]
|
||||
song = track.split(" - ", maxsplit=1)[1]
|
||||
return LyricsResult(artist=artist,
|
||||
song=song,
|
||||
src=self.label,
|
||||
lyrics=returned_lyrics,
|
||||
confidence=confidence)
|
||||
|
||||
except:
|
||||
traceback.print_exc()
|
||||
return
|
||||
|
||||
|
||||
|
44
lyric_search_new/tests.py
Normal file
44
lyric_search_new/tests.py
Normal file
@ -0,0 +1,44 @@
|
||||
#!/usr/bin/env python3.12
|
||||
# tests
|
||||
|
||||
import asyncio
|
||||
import sys
|
||||
import sources.cache, sources.genius, sources.aggregate
|
||||
import utils
|
||||
|
||||
test_artist = "hopsin"
|
||||
test_song = "ill mind of hopsin 5"
|
||||
|
||||
async def test_cache(artist, song):
|
||||
cache = sources.cache.Cache()
|
||||
result = await cache.search(artist, song)
|
||||
if not result:
|
||||
print(f"Could not find {artist} - {song}!")
|
||||
return
|
||||
print(result.dict())
|
||||
|
||||
# print(f"artist: {ret_artist}\nsong: {ret_song}:\n{ret_lyr}")
|
||||
# print(result)
|
||||
|
||||
async def test_genius(artist=None, song=None):
|
||||
if not artist or not song:
|
||||
artist = test_artist
|
||||
song = test_song
|
||||
genius = sources.genius.Genius()
|
||||
result = await genius.search(artist, song)
|
||||
print(result)
|
||||
|
||||
async def test_aggregate(artist=None, song=None):
|
||||
if not artist or not song:
|
||||
artist = test_artist
|
||||
song = test_song
|
||||
aggregate = sources.aggregate.Aggregate()
|
||||
result = await aggregate.search(artist, song)
|
||||
print(result.dict())
|
||||
|
||||
|
||||
|
||||
loop = asyncio.new_event_loop()
|
||||
loop.run_until_complete(test_genius())
|
||||
loop.run_until_complete(test_cache(artist=test_artist, song=test_song))
|
||||
loop.run_until_complete(test_aggregate())
|
@ -2,7 +2,7 @@
|
||||
|
||||
from difflib import SequenceMatcher
|
||||
from typing import List, Optional, Tuple
|
||||
import re
|
||||
import regex
|
||||
|
||||
class TrackMatcher:
|
||||
"""Track Matcher"""
|
||||
@ -16,16 +16,16 @@ class TrackMatcher:
|
||||
"""
|
||||
self.threshold = threshold
|
||||
|
||||
def find_best_match(self, input_track: str, candidate_tracks: List[str]) -> Optional[Tuple[str, float]]:
|
||||
def find_best_match(self, input_track: str, candidate_tracks: List[tuple[int|str, str]]) -> Optional[Tuple[str, float]]:
|
||||
"""
|
||||
Find the best matching track from the candidate list.
|
||||
|
||||
Args:
|
||||
input_track (str): Input track in "ARTIST - SONG" format
|
||||
candidate_tracks (List[str]): List of candidate tracks in same format
|
||||
candidate_tracks (List[tuple[int, str]]): List of candidate tracks
|
||||
|
||||
Returns:
|
||||
Optional[Tuple[str, float]]: Tuple of (best matching track, similarity score)
|
||||
Optional[Tuple[int, str, float]]: Tuple of (best matching track, similarity score)
|
||||
or None if no good match found
|
||||
"""
|
||||
if not input_track or not candidate_tracks:
|
||||
@ -34,11 +34,13 @@ class TrackMatcher:
|
||||
# Normalize input track
|
||||
input_track = self._normalize_string(input_track)
|
||||
|
||||
print(f"input_track: {input_track}")
|
||||
|
||||
best_match = None
|
||||
best_score = 0
|
||||
|
||||
for candidate in candidate_tracks:
|
||||
normalized_candidate = self._normalize_string(candidate)
|
||||
normalized_candidate = self._normalize_string(candidate[1])
|
||||
|
||||
# Calculate various similarity scores
|
||||
exact_score = 1.0 if input_track == normalized_candidate else 0.0
|
||||
@ -61,7 +63,7 @@ class TrackMatcher:
|
||||
extra spaces, and converting to lowercase.
|
||||
"""
|
||||
# Remove special characters and convert to lowercase
|
||||
text = re.sub(r'[^\w\s-]', '', text.lower())
|
||||
text = regex.sub(r'[^\w\s-]', '', text.lower())
|
||||
# Normalize spaces
|
||||
text = ' '.join(text.split())
|
||||
return text
|
||||
@ -80,3 +82,15 @@ class TrackMatcher:
|
||||
union = tokens1.union(tokens2)
|
||||
|
||||
return len(intersection) / len(union)
|
||||
|
||||
class DataUtils:
|
||||
"""
|
||||
Data Utils
|
||||
"""
|
||||
def scrub_lyrics(self, lyrics: str) -> str:
|
||||
# Regex chain
|
||||
lyrics = regex.sub(r'(\[.*?\])(\s){0,}(\:){0,1}', '', lyrics)
|
||||
lyrics = regex.sub(r'(\d?)(Embed\b)', '', lyrics, flags=regex.IGNORECASE)
|
||||
lyrics = regex.sub(r'\n{2}', '\n', lyrics) # Gaps between verses
|
||||
lyrics = regex.sub(r'[0-9]\b$', '', lyrics)
|
||||
return lyrics
|
||||
|
Loading…
x
Reference in New Issue
Block a user