#!/usr/bin/env python3.12 # pylint: disable=wrong-import-order, wrong-import-position bare-except, broad-exception-caught import os import time import regex import logging import sys import traceback sys.path.insert(1,'..') sys.path.insert(1,'.') from typing import Optional, Any import aiosqlite as sqlite3 from . import redis_cache from lyric_search_new import utils from lyric_search_new.constructors import LyricsResult logger = logging.getLogger() log_level = logging.getLevelName(logger.level) class Cache: """Cache Search Module""" def __init__(self) -> None: self.cache_db: str = os.path.join("/", "var", "lib", "singerdbs", "cached_lyrics.db") self.redis_cache = redis_cache.RedisCache() self.cache_pre_query: str = "pragma journal_mode = WAL; pragma synchronous = normal;\ pragma temp_store = memory; pragma mmap_size = 30000000000;" self.sqlite_exts: list[str] = ['/usr/local/lib/python3.11/dist-packages/spellfix1.cpython-311-x86_64-linux-gnu.so'] self.label: str = "Cache" def get_matched(self, matched_candidate: tuple, confidence: int, sqlite_rows: list[sqlite3.Row] = None, redis_results: Any = None) -> Optional[LyricsResult]: """ Get Matched Result Args: matched_candidate (tuple): the correctly matched candidate returned by matcher.best_match confidence (int): % confidence sqlite_rows (list[sqlite3.Row]|None): List of returned rows from SQLite DB, or None if Redis redis_results (Any): List of Redis returned data, or None if SQLite Returns: LyricsResult|None: The result, if found - None otherwise. """ matched_id: int = matched_candidate[0] if redis_results: for res in redis_results: (key, row) = res if key == matched_id: logging.info("Matched row: %s", row) return LyricsResult( artist=row['artist'], song=row['song'], lyrics=row['lyrics'], src=f"{row['src']} (redis cache, id: {key})", confidence=row['confidence'] ) else: for row in sqlite_rows: if row[0] == matched_id: (_id, artist, song, lyrics, original_src, _confidence) = row return LyricsResult( artist=artist, song=song, lyrics=lyrics, src=f"{original_src} (cached, id: {_id})", confidence=confidence) return None async def check_existence(self, artistsong: str) -> Optional[bool]: """ Check whether lyrics are already stored for track Args: artistsong (str): artist and song in artist\\nsong format Returns: bool: Whether track was found in cache """ logging.debug("Checking whether %s is already stored", artistsong.replace("\n", " - ")) check_query: str = 'SELECT id, artist, song FROM lyrics WHERE editdist3((lower(artist) || " " || lower(song)), (? || " " || ?))\ <= 410 ORDER BY editdist3((lower(artist) || " " || lower(song)), ?) ASC LIMIT 1' artistsong_split = artistsong.split("\n", maxsplit=1) artist = artistsong_split[0].lower() song = artistsong_split[1].lower() params = (artist, song, artistsong.lower()) async with sqlite3.connect(self.cache_db, timeout=2) as db_conn: await db_conn.enable_load_extension(True) for ext in self.sqlite_exts: await db_conn.load_extension(ext) async with await db_conn.executescript(self.cache_pre_query) as _db_cursor: async with await db_conn.execute(check_query, params) as db_cursor: result = await db_cursor.fetchone() if result: logging.debug("%s is already stored.", artistsong.replace("\n", " - ")) return True logging.debug("%s cleared to be stored.", artistsong) return False async def store(self, lyr_result: LyricsResult) -> None: """ Store lyrics (SQLite, then Redis) Args: lyr_result (LyricsResult): the returned lyrics to cache Returns: None """ sqlite_insert_id = await self.sqlite_store(lyr_result) if sqlite_insert_id: await self.redis_cache.redis_store(sqlite_insert_id, lyr_result) async def sqlite_store(self, lyr_result: LyricsResult) -> int: """ Store lyrics to SQLite Cache Args: lyr_result (LyricsResult): the returned lyrics to cache Returns: int: the inserted row id """ logging.info("Storing %s", f"{lyr_result.artist} - {lyr_result.song}") if lyr_result.src.lower() == "cache": logging.info("Skipping cache storage - returned LyricsResult originated from cache") return artistsong = f"{lyr_result.artist}\n{lyr_result.song}" if await self.check_existence(artistsong): logging.info("Skipping cache storage - %s is already stored.", artistsong.replace("\n", " - ")) return try: lyrics = regex.sub(r'(
|\n|\r\n)', ' / ', lyr_result.lyrics.strip()) lyrics = regex.sub(r'\s{2,}', ' ', lyrics) insert_query = "INSERT INTO lyrics (src, date_retrieved, artist, song, artistsong, confidence, lyrics)\ VALUES(?, ?, ?, ?, ?, ?, ?)" params = (lyr_result.src, time.time(), lyr_result.artist, lyr_result.song, artistsong, lyr_result.confidence, lyrics) async with sqlite3.connect(self.cache_db, timeout=2) as db_conn: async with await db_conn.executescript(self.cache_pre_query) as _db_cursor: async with await db_conn.execute(insert_query, params) as _cursor: await db_conn.commit() logging.info("Stored %s to SQLite!", artistsong.replace("\n", " - ")) return _cursor.lastrowid except: logging.critical("Cache storage error!") traceback.print_exc() # pylint: disable=unused-argument async def search(self, artist: str, song: str, **kwargs) -> Optional[LyricsResult]: """ Cache Search Args: artist: the artist to search song: the song to search Returns: LyricsResult|None: The result, if found - None otherwise. """ try: # pylint: enable=unused-argument artist: str = artist.strip().lower() song: str = song.strip().lower() input_track: str = f"{artist} - {song}" search_query = None search_params: Optional[tuple] = None random_search: bool = False time_start: float = time.time() matcher = utils.TrackMatcher() if artist == "!" and song == "!": random_search = True search_query: str = 'SELECT id, artist, song, lyrics, src, confidence\ FROM lyrics ORDER BY RANDOM() LIMIT 1' logging.info("Searching %s - %s on %s", artist, song, self.label) """Check Redis First""" logging.info("Checking redis cache for %s...", f"{artist} - {song}") redis_result = await self.redis_cache.search(artist=artist, song=song) if redis_result: result_tracks: list = [] for returned in redis_result: (key, track) = returned result_tracks.append((key, f"{track['artist']} - {track['song']}")) if not random_search: best_match: tuple|None = matcher.find_best_match(input_track=input_track, candidate_tracks=result_tracks) else: best_match = (result_tracks[0], 100) if best_match: (candidate, confidence) = best_match matched = self.get_matched(redis_results=redis_result, matched_candidate=candidate, confidence=confidence) if matched: time_end: float = time.time() time_diff: float = time_end - time_start matched.confidence = confidence matched.time = time_diff logging.info("Found %s on redis cache, skipping SQLite...", f"{artist} - {song}") return matched """SQLite: Fallback""" async with sqlite3.connect(self.cache_db, timeout=2) as db_conn: await db_conn.enable_load_extension(True) for ext in self.sqlite_exts: await db_conn.load_extension(ext) async with await db_conn.executescript(self.cache_pre_query) as _db_cursor: if not random_search: search_query: str = 'SELECT id, artist, song, lyrics, src, confidence FROM lyrics\ WHERE editdist3((lower(artist) || " " || lower(song)), (? || " " || ?))\ <= 410 ORDER BY editdist3((lower(artist) || " " || lower(song)), ?) ASC LIMIT 10' search_params: tuple = (artist.strip(), song.strip(), f"{artist.strip()} {song.strip()}") async with await _db_cursor.execute(search_query, search_params) as db_cursor: results: list = await db_cursor.fetchall() result_tracks: list = [] for track in results: (_id, _artist, _song, _lyrics, _src, _confidence) = track result_tracks.append((_id, f"{_artist} - {_song}")) if not random_search: best_match: tuple|None = matcher.find_best_match(input_track=input_track, candidate_tracks=result_tracks) else: best_match = (result_tracks[0], 100) if not best_match: return None (candidate, confidence) = best_match logging.info("Result found on %s", self.label) matched = self.get_matched(sqlite_rows=results, matched_candidate=candidate, confidence=confidence) time_end: float = time.time() time_diff: float = time_end - time_start matched.time = time_diff return matched except: traceback.print_exc() return