# isort: skip_file from typing import Optional, Any, Callable from uuid import uuid4 from urllib.parse import urlparse from pathlib import Path import hashlib import traceback import logging import random import asyncio import json import os import aiohttp import time import base64 # Monkey-patch streamrip's Tidal client credentials BEFORE importing TidalClient import streamrip.client.tidal as _tidal_module # type: ignore # noqa: E402 CLIENT_ID = base64.b64decode("ZlgySnhkbW50WldLMGl4VA==").decode("iso-8859-1") CLIENT_SECRET = base64.b64decode( "MU5tNUFmREFqeHJnSkZKYktOV0xlQXlLR1ZHbUlOdVhQUExIVlhBdnhBZz0=", ).decode("iso-8859-1") _tidal_module.CLIENT_ID = CLIENT_ID _tidal_module.CLIENT_SECRET = CLIENT_SECRET _tidal_module.AUTH = aiohttp.BasicAuth( login=_tidal_module.CLIENT_ID, password=_tidal_module.CLIENT_SECRET ) from streamrip.client import TidalClient # type: ignore # noqa: E402 from streamrip.config import Config as StreamripConfig # type: ignore # noqa: E402 from dotenv import load_dotenv # noqa: E402 from rapidfuzz import fuzz # noqa: E402 # Path to persist Tidal tokens across restarts TIDAL_TOKEN_CACHE_PATH = Path(__file__).parent.parent / "tidal_token.json" class MetadataFetchError(Exception): """Raised when metadata fetch permanently fails after retries.""" # How long before token expiry to proactively refresh (seconds) TIDAL_TOKEN_REFRESH_BUFFER = 600 # 10 minutes # Maximum age of a session before forcing a fresh login (seconds) TIDAL_SESSION_MAX_AGE = 1800 # 30 minutes # Suppress noisy logging from this module and from the `streamrip` library # We set propagate=False so messages don't bubble up to the root logger and # attach a NullHandler where appropriate to avoid "No handler found" warnings. for name in [__name__, "utils.sr_wrapper", "streamrip", "streamrip.client"]: logger = logging.getLogger(name) # Keep default level (or raise to WARNING) so non-important logs are dropped try: logger.setLevel(logging.WARNING) except Exception: pass logger.propagate = False # Ensure a NullHandler is present so logs don't propagate and no missing-handler # warnings are printed when the package emits records. if not any(isinstance(h, logging.NullHandler) for h in logger.handlers): logger.addHandler(logging.NullHandler()) load_dotenv() class SRUtil: """ StreamRip API Utility Class """ def __init__(self) -> None: """Initialize StreamRip utility.""" self.streamrip_config = StreamripConfig.defaults() self._load_tidal_config() self.streamrip_config.session.conversion.enabled = False self.streamrip_config.session.downloads.folder = os.getenv( "tidal_download_folder", "" ) self.streamrip_client = TidalClient(self.streamrip_config) self.MAX_CONCURRENT_METADATA_REQUESTS = 2 self.METADATA_RATE_LIMIT = 1.25 self.METADATA_SEMAPHORE = asyncio.Semaphore( self.MAX_CONCURRENT_METADATA_REQUESTS ) self.LAST_METADATA_REQUEST = 0 self.MAX_METADATA_RETRIES = 5 self.METADATA_ALBUM_CACHE: dict[str, dict] = {} self.RETRY_DELAY = 1.0 # seconds between retries # Callback invoked when a 429 is first observed. Signature: (Exception) -> None or async self.on_rate_limit: Optional[Callable[[Exception], Any]] = None # Internal flag to avoid repeated notifications for the same runtime self._rate_limit_notified = False # Track when we last successfully logged in self._last_login_time: Optional[float] = None # Track last successful API call self._last_successful_request: Optional[float] = None # Keepalive task handle self._keepalive_task: Optional[asyncio.Task] = None # Keepalive interval in seconds self.KEEPALIVE_INTERVAL = 180 # 3 minutes async def start_keepalive(self) -> None: """Start the background keepalive task. This should be called once at startup to ensure the Tidal session stays alive even during idle periods. """ if self._keepalive_task and not self._keepalive_task.done(): logging.info("Tidal keepalive task already running") return # Ensure initial login try: await self._login_and_persist() logging.info("Initial Tidal login successful") except Exception as e: logging.warning("Initial Tidal login failed: %s", e) self._keepalive_task = asyncio.create_task(self._keepalive_runner()) logging.info("Tidal keepalive task started") async def stop_keepalive(self) -> None: """Stop the background keepalive task.""" if self._keepalive_task and not self._keepalive_task.done(): self._keepalive_task.cancel() try: await self._keepalive_task except asyncio.CancelledError: pass logging.info("Tidal keepalive task stopped") async def _keepalive_runner(self) -> None: """Background task to keep the Tidal session alive.""" while True: try: await asyncio.sleep(self.KEEPALIVE_INTERVAL) # Check if we've had recent activity if self._last_successful_request: time_since_last = time.time() - self._last_successful_request if time_since_last < self.KEEPALIVE_INTERVAL: # Recent activity, no need to ping continue # Check if token is expiring soon and proactively refresh if self._is_token_expiring_soon(): logging.info("Tidal keepalive: Token expiring soon, refreshing...") try: await self._login_and_persist(force=True) logging.info("Tidal keepalive: Token refresh successful") except Exception as e: logging.warning("Tidal keepalive: Token refresh failed: %s", e) continue # Check if session is stale if self._is_session_stale(): logging.info("Tidal keepalive: Session stale, refreshing...") try: await self._login_and_persist(force=True) logging.info("Tidal keepalive: Session refresh successful") except Exception as e: logging.warning( "Tidal keepalive: Session refresh failed: %s", e ) continue # Make a lightweight API call to keep the session alive if self.streamrip_client.logged_in: try: # Simple search to keep the connection alive await self._safe_api_call( self.streamrip_client.search, media_type="artist", query="test", retries=1, ) logging.debug("Tidal keepalive ping successful") except Exception as e: logging.warning("Tidal keepalive ping failed: %s", e) # Try to refresh the session try: await self._login_and_persist(force=True) except Exception: pass except asyncio.CancelledError: logging.info("Tidal keepalive task cancelled") break except Exception as e: logging.error("Error in Tidal keepalive task: %s", e) def _load_tidal_config(self) -> None: """Load Tidal config from cache file if available, otherwise from env.""" tidal = self.streamrip_config.session.tidal cached = self._load_cached_tokens() if cached: tidal.user_id = cached.get("user_id", "") tidal.access_token = cached.get("access_token", "") tidal.refresh_token = cached.get("refresh_token", "") tidal.token_expiry = cached.get("token_expiry", "") tidal.country_code = cached.get( "country_code", os.getenv("tidal_country_code", "") ) else: tidal.user_id = os.getenv("tidal_user_id", "") tidal.access_token = os.getenv("tidal_access_token", "") tidal.refresh_token = os.getenv("tidal_refresh_token", "") tidal.token_expiry = os.getenv("tidal_token_expiry", "") tidal.country_code = os.getenv("tidal_country_code", "") tidal.quality = int(os.getenv("tidal_default_quality", 2)) def _load_cached_tokens(self) -> Optional[dict]: """Load cached tokens from disk if valid.""" try: if TIDAL_TOKEN_CACHE_PATH.exists(): with open(TIDAL_TOKEN_CACHE_PATH, "r") as f: data = json.load(f) # Validate required fields exist if all( k in data for k in ("access_token", "refresh_token", "token_expiry") ): logging.info("Loaded Tidal tokens from cache") return data except Exception as e: logging.warning("Failed to load cached Tidal tokens: %s", e) return None def _save_cached_tokens(self) -> None: """Persist current tokens to disk for use across restarts.""" try: tidal = self.streamrip_config.session.tidal data = { "user_id": tidal.user_id, "access_token": tidal.access_token, "refresh_token": tidal.refresh_token, "token_expiry": tidal.token_expiry, "country_code": tidal.country_code, } with open(TIDAL_TOKEN_CACHE_PATH, "w") as f: json.dump(data, f) logging.info("Saved Tidal tokens to cache") except Exception as e: logging.warning("Failed to save Tidal tokens: %s", e) def _apply_new_tokens(self, auth_info: dict) -> None: """Apply new tokens from device auth to config.""" tidal = self.streamrip_config.session.tidal tidal.user_id = str(auth_info.get("user_id", "")) tidal.access_token = auth_info.get("access_token", "") tidal.refresh_token = auth_info.get("refresh_token", "") tidal.token_expiry = auth_info.get("token_expiry", "") tidal.country_code = auth_info.get("country_code", tidal.country_code) self._save_cached_tokens() async def start_device_auth(self) -> tuple[str, str]: """Start device authorization flow. Returns: tuple: (device_code, verification_url) - User should visit the URL to authorize. """ if ( not hasattr(self.streamrip_client, "session") or not self.streamrip_client.session ): self.streamrip_client.session = await self.streamrip_client.get_session() device_code, verification_url = await self.streamrip_client._get_device_code() return device_code, verification_url async def check_device_auth(self, device_code: str) -> tuple[bool, Optional[str]]: """Check if user has completed device authorization. Args: device_code: The device code from start_device_auth() Returns: tuple: (success, error_message) - (True, None) if auth completed successfully - (False, "pending") if user hasn't authorized yet - (False, error_message) if auth failed """ status, auth_info = await self.streamrip_client._get_auth_status(device_code) if status == 0: # Success - apply new tokens self._apply_new_tokens(auth_info) # Re-login with new tokens self.streamrip_client.logged_in = False try: await self.streamrip_client.login() self._save_cached_tokens() return True, None except Exception as e: return False, f"Login after auth failed: {e}" elif status == 2: # Pending - user hasn't authorized yet return False, "pending" else: # Failed return False, "Authorization failed" def _is_token_expiring_soon(self) -> bool: """Check if the token is about to expire within the buffer window.""" tidal = self.streamrip_config.session.tidal token_expiry = getattr(tidal, "token_expiry", None) if not token_expiry: return True # No expiry info means we should refresh try: # token_expiry can be a Unix timestamp (float/int/string) or ISO string if not isinstance(token_expiry, str): expiry_ts = float(token_expiry) else: # Try parsing as a numeric Unix timestamp first try: expiry_ts = float(token_expiry) except ValueError: # Fall back to ISO format string from datetime import datetime expiry_dt = datetime.fromisoformat( token_expiry.replace("Z", "+00:00") ) expiry_ts = expiry_dt.timestamp() return expiry_ts < (time.time() + TIDAL_TOKEN_REFRESH_BUFFER) except Exception as e: logging.warning("Failed to parse token expiry '%s': %s", token_expiry, e) return True # Err on the side of refreshing def _is_session_stale(self) -> bool: """Check if the login session is too old and should be refreshed.""" if not self._last_login_time: return True session_age = time.time() - self._last_login_time return session_age > TIDAL_SESSION_MAX_AGE async def _force_fresh_login(self) -> bool: """Force a complete fresh login, ignoring logged_in state. Returns True if login succeeded, False otherwise. """ # Reset the logged_in flag to force a fresh login self.streamrip_client.logged_in = False # Close existing session if present if hasattr(self.streamrip_client, "session") and self.streamrip_client.session: try: if not self.streamrip_client.session.closed: await self.streamrip_client.session.close() except Exception as e: logging.warning("Error closing old session: %s", e) # Use object.__setattr__ to bypass type checking for session reset try: object.__setattr__(self.streamrip_client, "session", None) except Exception: pass # Session will be recreated on next login try: logging.info("Forcing fresh Tidal login...") await self.streamrip_client.login() self._last_login_time = time.time() self._save_cached_tokens() logging.info("Fresh Tidal login successful") return True except Exception as e: logging.warning( "Forced Tidal login failed: %s - device re-auth may be required", e ) return False async def _login_and_persist(self, force: bool = False) -> None: """Login to Tidal and persist any refreshed tokens. Args: force: If True, force a fresh login even if already logged in. This method now checks for: 1. Token expiry - refreshes if token is about to expire 2. Session age - refreshes if session is too old 3. logged_in state - logs in if not logged in If refresh fails, logs a warning but does not raise. """ needs_login = force or not self.streamrip_client.logged_in # Check if token is expiring soon if not needs_login and self._is_token_expiring_soon(): logging.info("Tidal token expiring soon, will refresh") needs_login = True # Check if session is too old if not needs_login and self._is_session_stale(): logging.info("Tidal session is stale, will refresh") needs_login = True if not needs_login: return try: # Reset logged_in to ensure fresh login attempt if force or self._is_token_expiring_soon(): self.streamrip_client.logged_in = False await self.streamrip_client.login() self._last_login_time = time.time() # After login, tokens may have been refreshed - persist them self._save_cached_tokens() logging.info("Tidal login/refresh successful") except Exception as e: logging.warning( "Tidal login/refresh failed: %s - device re-auth may be required", e ) # Don't mark as logged in on failure - let subsequent calls retry async def rate_limited_request(self, func, *args, **kwargs): """Rate-limited wrapper that also ensures login before making requests.""" async with self.METADATA_SEMAPHORE: now = time.time() elapsed = now - self.LAST_METADATA_REQUEST if elapsed < self.METADATA_RATE_LIMIT: await asyncio.sleep(self.METADATA_RATE_LIMIT - elapsed) # Ensure we're logged in before making the request try: await self._login_and_persist() except Exception as e: logging.warning( "Pre-request login failed in rate_limited_request: %s", e ) result = await func(*args, **kwargs) self.LAST_METADATA_REQUEST = time.time() return result async def _safe_api_call( self, func, *args, retries: int = 3, backoff: float = 0.5, **kwargs ): """Call an async API function with resilient retry behavior. - On AttributeError: attempt a `login()` once and retry. - On connection-related errors (aiohttp.ClientError, OSError, Timeout): attempt a `login()` and retry up to `retries` times. - On 400/429 responses (message contains '400' or '429'): retry with backoff without triggering login (to avoid excessive logins). - On 401 (Unauthorized): force a fresh login and retry. Returns the result or raises the last exception. """ last_exc: Optional[Exception] = None for attempt in range(retries): try: # Before each attempt, ensure we have a valid session if attempt == 0: # On first attempt, try to ensure logged in (checks token expiry) # Wrapped in try/except so login failures don't block the API call try: await self._login_and_persist() except Exception as login_err: logging.warning( "Pre-request login failed: %s (continuing anyway)", login_err, ) result = await func(*args, **kwargs) # Track successful request self._last_successful_request = time.time() return result except AttributeError as e: # Probably missing/closed client internals: try re-login once last_exc = e logging.warning( "AttributeError in API call (attempt %d/%d): %s", attempt + 1, retries, e, ) try: await self._force_fresh_login() except Exception: pass continue except Exception as e: last_exc = e msg = str(e) # Treat 400/429 as transient rate-limit/server responses — retry without login if ("400" in msg or "429" in msg) and attempt < retries - 1: # Notify on the first observed 429 (if a callback is set) try: if ( "429" in msg and not self._rate_limit_notified and self.on_rate_limit ): self._rate_limit_notified = True try: if asyncio.iscoroutinefunction(self.on_rate_limit): asyncio.create_task(self.on_rate_limit(e)) else: loop = asyncio.get_running_loop() loop.run_in_executor(None, self.on_rate_limit, e) except Exception: pass except Exception: pass await asyncio.sleep(backoff * (2**attempt)) continue # Treat 401 (Unauthorized) as an auth failure: force a fresh re-login then retry is_401_error = ( ( isinstance(e, aiohttp.ClientResponseError) and getattr(e, "status", None) == 401 ) or "401" in msg or "unauthorized" in msg.lower() ) if is_401_error: logging.warning( "Received 401/Unauthorized from Tidal (attempt %d/%d). Forcing fresh re-login...", attempt + 1, retries, ) try: # Use force=True to ensure we actually re-authenticate login_success = await self._force_fresh_login() if login_success: logging.info("Forced re-login after 401 successful") else: logging.warning( "Forced re-login after 401 failed - may need device re-auth" ) except Exception as login_exc: logging.warning("Forced login after 401 failed: %s", login_exc) if attempt < retries - 1: await asyncio.sleep(backoff * (2**attempt)) continue # Connection related errors — try to re-login then retry if ( isinstance( e, ( aiohttp.ClientError, OSError, ConnectionError, asyncio.TimeoutError, ), ) or "Connection" in msg or "closed" in msg.lower() ): try: await self._login_and_persist(force=True) except Exception: pass if attempt < retries - 1: await asyncio.sleep(backoff * (2**attempt)) continue # Unhandled / permanent error: re-raise after loop ends # If we reach here, raise the last exception if last_exc: raise last_exc return None def is_fuzzy_match(self, expected, actual, threshold=80): if not expected or not actual: return False return fuzz.token_set_ratio(expected.lower(), actual.lower()) >= threshold def is_metadata_match( self, expected_artist, expected_album, expected_title, found_artist, found_album, found_title, threshold=80, ): artist_match = self.is_fuzzy_match(expected_artist, found_artist, threshold) album_match = ( self.is_fuzzy_match(expected_album, found_album, threshold) if expected_album else True ) title_match = self.is_fuzzy_match(expected_title, found_title, threshold) return artist_match and album_match and title_match def dedupe_by_key(self, key: str | list[str], entries: list[dict]) -> list[dict]: """Return entries de-duplicated by one or more keys.""" keys = [key] if isinstance(key, str) else list(key) if not keys: return entries def normalize(value: Any) -> str: return str(value or "").strip().lower() deduped: dict[tuple[str, ...], dict] = {} for entry in entries: composite_key = tuple(normalize(entry.get(k)) for k in keys) if composite_key not in deduped: deduped[composite_key] = entry return list(deduped.values()) def group_artists_by_name( self, entries: list[dict], query: Optional[str] = None ) -> list[dict]: """ Group artist entries by normalized display name and pick a primary candidate per name. Returns a list of dicts where each dict contains the primary candidate plus an `alternatives` list for other artists that share the same display name. Scoring/selection policy: - If `query` is provided, prefer an exact case-insensitive match. - Otherwise prefer the entry with highest fuzzy match to `query`. - Use `popularity` as a tiebreaker. This keeps a single line in an autocomplete dropdown while preserving the alternate choices (IDs) so the UI can show a submenu or a secondary picker. """ buckets: dict[str, list[dict]] = {} for e in entries: name = e.get("artist", "") norm = name.strip().lower() buckets.setdefault(norm, []).append(e) out: list[dict] = [] for norm, items in buckets.items(): if len(items) == 1: primary = items[0] alternatives: list[dict] = [] else: # Score each item scored = [] for it in items: score = 0.0 if query: try: if ( it.get("artist", "").strip().lower() == query.strip().lower() ): score += 1000.0 else: score += float( fuzz.token_set_ratio(query, it.get("artist", "")) ) except Exception: score += 0.0 # add small weight for popularity if present pop = it.get("popularity") or 0 try: score += float(pop) / 100.0 except Exception: pass scored.append((score, it)) scored.sort(key=lambda x: x[0], reverse=True) primary = scored[0][1] alternatives = [it for _, it in scored[1:]] out.append( { "artist": primary.get("artist"), "id": primary.get("id"), "popularity": primary.get("popularity"), "alternatives": alternatives, } ) return out def format_duration(self, seconds): if not seconds: return None m, s = divmod(seconds, 60) return f"{m}:{s:02}" def _get_tidal_cover_url(self, uuid, size): """Generate a tidal cover url. :param uuid: VALID uuid string :param size: """ TIDAL_COVER_URL = ( "https://resources.tidal.com/images/{uuid}/{width}x{height}.jpg" ) possibles = (80, 160, 320, 640, 1280) assert size in possibles, f"size must be in {possibles}" return TIDAL_COVER_URL.format( uuid=uuid.replace("-", "/"), height=size, width=size, ) def combine_album_track_metadata( self, album_json: dict | None, track_json: dict ) -> dict: """ Combine album-level and track-level metadata into a unified tag dictionary. Track-level metadata overrides album-level where relevant. """ album_json = album_json or {} # Album-level combined = { "album": album_json.get("title"), "album_artist": album_json.get("artist", {}).get("name"), "release_date": album_json.get("releaseDate"), "album_type": album_json.get("type"), "total_tracks": album_json.get("numberOfTracks"), "upc": album_json.get("upc"), "album_copyright": album_json.get("copyright"), "album_cover_id": album_json.get("cover"), "album_cover_url": ( f"https://resources.tidal.com/images/{album_json.get('cover')}/1280x1280.jpg" if album_json.get("cover") else None ), } # Track-level (overrides or adds to album info) combined.update( { "title": track_json.get("title"), "artist": track_json.get("artist", {}).get("name"), "artists": [a.get("name") for a in track_json.get("artists", [])], "track_number": track_json.get("trackNumber"), "disc_number": track_json.get("volumeNumber"), "duration": track_json.get("duration"), "isrc": track_json.get("isrc"), "bpm": track_json.get("bpm"), "explicit": track_json.get("explicit"), "replaygain": track_json.get("replayGain"), "peak": track_json.get("peak"), "lyrics": track_json.get("lyrics"), "track_copyright": track_json.get("copyright"), "cover_id": track_json.get("album", {}).get("cover") or album_json.get("cover"), "cover_url": ( f"https://resources.tidal.com/images/{track_json.get('album', {}).get('cover', album_json.get('cover'))}/1280x1280.jpg" if ( track_json.get("album", {}).get("cover") or album_json.get("cover") ) else None ), } ) return combined def combine_album_with_all_tracks( self, album_json: dict[str, Any] ) -> list[dict[str, Any]]: """Return a list of combined metadata dicts for all tracks in an album JSON.""" return [ self.combine_album_track_metadata(album_json, t) for t in album_json.get("tracks", []) ] async def get_artists_by_name( self, artist_name: str, group: bool = False ) -> Optional[list]: """Get artist(s) by name. Args: artist_name: query string to search for. group: if True return grouped results (one primary per display name with `alternatives` list). If False return raw search items (legacy shape). Retry login only on authentication failure. Rate limit and retry on 400/429. """ artists_out: list[dict] = [] max_retries = 4 delay = 1.0 for attempt in range(max_retries): try: artists = await self._safe_api_call( self.streamrip_client.search, media_type="artist", query=artist_name, retries=3, ) break except Exception as e: msg = str(e) if ("400" in msg or "429" in msg) and attempt < max_retries - 1: await asyncio.sleep(delay) delay *= 2 continue else: return None else: return None # `artists` can be None or a list of result pages — guard accordingly if not artists: return None # If the client returned paged results (list), pick first page dict if isinstance(artists, list): artists_page = artists[0] if len(artists) > 0 else {} else: artists_page = artists artists_items = ( artists_page.get("items", []) if isinstance(artists_page, dict) else [] ) if not artists_items: return None artists_out = [ { "artist": res["name"], "id": res["id"], "popularity": res.get("popularity", 0), } for res in artists_items if "name" in res and "id" in res ] if group: return self.group_artists_by_name(artists_out, query=artist_name) return artists_out async def get_albums_by_artist_id(self, artist_id: int) -> Optional[list | dict]: """Get albums by artist ID. Retry login only on authentication failure. Rate limit and retry on 400/429.""" artist_id_str: str = str(artist_id) albums_out: list[dict] = [] max_retries = 4 delay = 1.0 for attempt in range(max_retries): try: metadata = await self._safe_api_call( self.streamrip_client.get_metadata, artist_id_str, "artist", retries=3, ) break except Exception as e: msg = str(e) if ("400" in msg or "429" in msg) and attempt < max_retries - 1: await asyncio.sleep(delay) delay *= 2 continue else: return None else: return None if not metadata: return None albums = self.dedupe_by_key( ["title", "releaseDate"], metadata.get("albums", []) ) albums_out = [ { "artist": ", ".join(artist["name"] for artist in album["artists"]), "album": album["title"], "id": album["id"], "release_date": album.get("releaseDate", "Unknown"), } for album in albums if "title" in album and "id" in album and "artists" in album ] return albums_out async def get_album_by_name(self, artist: str, album: str) -> Optional[dict]: """Get album by artist and album name using artist ID and fuzzy matching. Try first 8 chars, then 12 if no match. Notify on success.""" # Notification moved to add_cover_art.py as requested for trunc in (8, 12): search_artist = artist[:trunc] artists = await self.get_artists_by_name(search_artist) if not artists: continue best_artist = None best_artist_score = 0 for a in artists: score = fuzz.token_set_ratio(artist, a["artist"]) if score > best_artist_score: best_artist = a best_artist_score = int(score) if not best_artist or best_artist_score < 85: continue artist_id = best_artist["id"] albums = await self.get_albums_by_artist_id(artist_id) if not albums: continue best_album = None best_album_score = 0 for alb in albums: score = fuzz.token_set_ratio(album, alb["album"]) if score > best_album_score: best_album = alb best_album_score = int(score) if best_album and best_album_score >= 85: return best_album return None async def get_cover_by_album_id( self, album_id: int, size: int = 640 ) -> Optional[str]: """Get cover URL by album ID. Retry login only on authentication failure.""" if size not in [80, 160, 320, 640, 1280]: return None album_id_str: str = str(album_id) for attempt in range(2): try: metadata = await self._safe_api_call( self.streamrip_client.get_metadata, item_id=album_id_str, media_type="album", retries=2, ) break except Exception: if attempt == 1: return None else: return None if not metadata: return None cover_id = metadata.get("cover") if not cover_id: return None cover_url = self._get_tidal_cover_url(cover_id, size) return cover_url async def get_tracks_by_album_id( self, album_id: int, quality: str = "FLAC" ) -> Optional[list | dict]: """Get tracks by album ID Args: album_id (int): The ID of the album. Returns: Optional[list[dict]]: List of tracks or None if not found. """ album_id_str = str(album_id) try: metadata = await self._safe_api_call( self.streamrip_client.get_metadata, item_id=album_id_str, media_type="album", retries=2, ) except Exception as e: logging.warning("get_tracks_by_album_id failed: %s", e) return None if not metadata: logging.warning("No metadata found for album ID: %s", album_id) return None track_list = metadata.get("tracks", []) tracks_out: list[dict] = [ { "id": track.get("id"), "artist": track.get("artist").get("name"), "title": track.get("title"), "duration": self.format_duration(track.get("duration", 0)), "version": track.get("version"), "audioQuality": track.get("audioQuality"), } for track in track_list ] return tracks_out async def get_tracks_by_artist_song( self, artist: str, song: str, n: int = 0 ) -> Optional[list]: """Get track by artist and song name Args: artist (str): The name of the artist. song (str): The name of the song. Returns: Optional[dict]: The track details or None if not found. TODO: Reimplement using StreamRip """ try: # _safe_api_call already handles login, no need to call it here search_res = await self._safe_api_call( self.streamrip_client.search, media_type="track", query=f"{artist} - {song}", retries=3, ) logging.debug("Search result: %s", search_res) return ( search_res[0].get("items") if search_res and isinstance(search_res, list) else [] ) except Exception as e: logging.warning("Search Exception: %s", str(e)) if n < 2: # Reduce max retries from 3 to 2 n += 1 await asyncio.sleep(0.5 * n) # Add backoff return await self.get_tracks_by_artist_song(artist, song, n) return [] async def get_stream_url_by_track_id( self, track_id: int, quality: str = "FLAC" ) -> Optional[str]: """Get stream URL by track ID Args: track_id (int): The ID of the track. quality (str): The quality of the stream, default is "LOSSLESS". Other options: HIGH, LOW Returns: Optional[str]: The stream URL or None if not found. """ if quality not in ["FLAC", "Lossy"]: logging.error("Invalid quality requested: %s", quality) return None quality_int: int = int(self.streamrip_config.session.tidal.quality) match quality: case "FLAC": quality_int = 2 case "Lossy": quality_int = 1 track_id_str: str = str(track_id) # Ensure client is logged in via safe call when needed inside _safe_api_call try: logging.critical("Using quality_int: %s", quality_int) track = await self._safe_api_call( self.streamrip_client.get_downloadable, track_id=track_id_str, quality=quality_int, retries=3, ) except Exception as e: logging.warning("get_stream_url_by_track_id failed: %s", e) return None if not track: logging.warning("No track found for ID: %s", track_id) return None stream_url = track.url if not stream_url: logging.warning("No stream URL found for track ID: %s", track_id) return None return stream_url async def get_metadata_by_track_id(self, track_id: int) -> Optional[dict]: """ Fetch track + album metadata with retries, caching album data. Returns combined metadata dict or None after exhausting retries. """ for attempt in range(1, self.MAX_METADATA_RETRIES + 1): try: # Track metadata metadata = await self.rate_limited_request( self.streamrip_client.get_metadata, str(track_id), "track" ) album_id = metadata.get("album", {}).get("id") album_metadata = None if album_id: # Check cache first if album_id in self.METADATA_ALBUM_CACHE: album_metadata = self.METADATA_ALBUM_CACHE[album_id] else: album_metadata = await self.rate_limited_request( lambda i, t: self._safe_api_call( self.streamrip_client.get_metadata, i, t, retries=2 ), album_id, "album", ) if not album_metadata: return None self.METADATA_ALBUM_CACHE[album_id] = album_metadata # Combine track + album metadata if not album_metadata: return None combined_metadata: dict = self.combine_album_track_metadata( album_metadata, metadata ) # Include album id so callers can fetch cover art if desired combined_metadata["album_id"] = album_id logging.info( "Combined metadata for track ID %s (attempt %d): %s", track_id, attempt, combined_metadata, ) return combined_metadata except Exception as e: err_str = str(e).lower() # If this is a permanent not found error, abort retries immediately if any( phrase in err_str for phrase in [ "track not found", "not found", "404", "does not exist", "no longer available", "asset is not ready", ] ): logging.error( "Metadata fetch permanent failure for track %s: %s (not retrying)", track_id, str(e), ) raise MetadataFetchError( f"Metadata fetch failed permanently for track {track_id}: {e}" ) # Exponential backoff with jitter for 429 or other errors delay = self.RETRY_DELAY * (2 ** (attempt - 1)) + random.uniform(0, 0.5) if attempt < self.MAX_METADATA_RETRIES: logging.warning( "Retrying metadata fetch for track %s (attempt %d/%d): %s. Next retry in %.2fs", track_id, attempt, self.MAX_METADATA_RETRIES, str(e), delay, ) await asyncio.sleep(delay) else: logging.error( "Metadata fetch failed permanently for track %s after %d attempts: %s", track_id, self.MAX_METADATA_RETRIES, str(e), ) # Raise a specific exception so callers can react (e.g. notify) raise MetadataFetchError( f"Metadata fetch failed permanently for track {track_id} after {self.MAX_METADATA_RETRIES} attempts: {e}" ) # If we reach here without returning, raise a generic metadata error raise MetadataFetchError(f"Metadata fetch failed for track {track_id}") async def download(self, track_id: int, quality: str = "LOSSLESS") -> bool | str: """Download track Args: track_id (int) quality (str): The quality of the stream, default is "LOSSLESS". Other options: HIGH, LOW Returns: bool """ try: track_url = await self.get_stream_url_by_track_id(track_id) if not track_url: return False parsed_url = urlparse(track_url) parsed_url_filename = os.path.basename(parsed_url.path) parsed_url_ext = os.path.splitext(parsed_url_filename)[1] unique = hashlib.sha256(uuid4().bytes).hexdigest()[:16] dl_folder_path = ( f"{self.streamrip_config.session.downloads.folder}/{unique}" ) dl_path = f"{dl_folder_path}/{track_id}.{parsed_url_ext}" # ensure download folder exists try: os.makedirs(dl_folder_path, exist_ok=True) except Exception: pass async with aiohttp.ClientSession() as session: async with session.get( track_url, headers={}, timeout=aiohttp.ClientTimeout(total=60) ) as resp: resp.raise_for_status() with open(dl_path, "wb") as f: async for chunk in resp.content.iter_chunked(1024 * 64): f.write(chunk) return dl_path except Exception as e: logging.critical("Error: %s", str(e)) return False # ========================================================================= # Video Support # ========================================================================= async def search_videos(self, query: str, limit: int = 50) -> Optional[list[dict]]: """Search for videos by query string. Args: query: Search query (artist name, song title, etc.) limit: Maximum number of results to return. Returns: List of video results with id, title, artist, duration, etc. """ max_retries = 4 delay = 1.0 for attempt in range(max_retries): try: results = await self._safe_api_call( self.streamrip_client.search, media_type="video", query=query, limit=limit, retries=3, ) break except Exception as e: msg = str(e) if ("400" in msg or "429" in msg) and attempt < max_retries - 1: await asyncio.sleep(delay) delay *= 2 continue else: logging.warning("Video search failed: %s", e) return None else: return None if not results: return None # Results can be paged - get items from first page if isinstance(results, list): results_page = results[0] if results else {} else: results_page = results items = results_page.get("items", []) if isinstance(results_page, dict) else [] if not items: return None videos_out = [] for item in items: artist_info = item.get("artist") or item.get("artists", [{}])[0] if item.get("artists") else {} artist_name = artist_info.get("name", "Unknown Artist") if isinstance(artist_info, dict) else str(artist_info) videos_out.append({ "id": item.get("id"), "title": item.get("title"), "artist": artist_name, "duration": item.get("duration"), "duration_formatted": self.format_duration(item.get("duration")), "release_date": item.get("releaseDate"), "image_id": item.get("imageId"), "image_url": ( f"https://resources.tidal.com/images/{item.get('imageId').replace('-', '/')}/640x360.jpg" if item.get("imageId") else None ), "quality": item.get("quality"), }) return videos_out async def get_video_metadata(self, video_id: int) -> Optional[dict]: """Get metadata for a specific video by ID. Args: video_id: The Tidal video ID. Returns: Video metadata dict or None if not found. """ video_id_str = str(video_id) try: metadata = await self._safe_api_call( self.streamrip_client.get_metadata, item_id=video_id_str, media_type="video", retries=3, ) except Exception as e: logging.warning("get_video_metadata failed for %s: %s", video_id, e) return None if not metadata: return None artist_info = metadata.get("artist") or (metadata.get("artists", [{}])[0] if metadata.get("artists") else {}) artist_name = artist_info.get("name", "Unknown Artist") if isinstance(artist_info, dict) else str(artist_info) return { "id": metadata.get("id"), "title": metadata.get("title"), "artist": artist_name, "artists": [a.get("name") for a in metadata.get("artists", [])], "duration": metadata.get("duration"), "duration_formatted": self.format_duration(metadata.get("duration")), "release_date": metadata.get("releaseDate"), "image_id": metadata.get("imageId"), "image_url": ( f"https://resources.tidal.com/images/{metadata.get('imageId').replace('-', '/')}/1280x720.jpg" if metadata.get("imageId") else None ), "thumbnail_url": ( f"https://resources.tidal.com/images/{metadata.get('imageId').replace('-', '/')}/640x360.jpg" if metadata.get("imageId") else None ), "quality": metadata.get("quality"), "explicit": metadata.get("explicit"), "album": metadata.get("album", {}).get("title") if metadata.get("album") else None, "album_id": metadata.get("album", {}).get("id") if metadata.get("album") else None, } async def get_video_stream_url(self, video_id: int) -> Optional[str]: """Get the HLS stream URL for a video. Args: video_id: The Tidal video ID. Returns: The highest quality video HLS variant URL (.m3u8) or None if not available. """ video_id_str = str(video_id) logging.info("VIDEO %s: Fetching stream URL...", video_id) try: # First try the standard streamrip method logging.info("VIDEO %s: Trying streamrip get_video_file_url...", video_id) url = await self._safe_api_call( self.streamrip_client.get_video_file_url, video_id=video_id_str, retries=2, ) if url: logging.info("VIDEO %s: Got stream URL via streamrip", video_id) return url if url else None except Exception as e: # Streamrip's get_video_file_url may fail if Tidal returns HLS manifest # directly instead of a JSON with URLs. Try to get the manifest URL directly. err_msg = str(e) logging.info("VIDEO %s: streamrip method failed (%s), trying fallback...", video_id, err_msg[:100]) if "mpegurl" in err_msg.lower() or ".m3u8" in err_msg: # Extract the master manifest URL from the error message import re m3u8_match = re.search(r"(https://[^\s'\"]+\.m3u8[^\s'\"]*)", err_msg) if m3u8_match: master_url = m3u8_match.group(1) logging.info("VIDEO %s: Extracted HLS master URL from error", video_id) # Try to get the highest quality variant from the master playlist best_url = await self._get_best_variant_from_master(master_url) return best_url or master_url # Fall back to fetching the manifest URL directly from Tidal API try: logging.info("VIDEO %s: Trying direct API manifest fetch...", video_id) result = await self._get_video_manifest_url(video_id_str) if result: logging.info("VIDEO %s: Got stream URL via direct API", video_id) return result except Exception as e2: logging.warning("get_video_stream_url failed for %s: %s (fallback: %s)", video_id, e, e2) return None async def _get_best_variant_from_master(self, master_url: str) -> Optional[str]: """Parse HLS master playlist and return the highest quality variant URL.""" import re try: # Ensure we have a session if not hasattr(self.streamrip_client, 'session') or not self.streamrip_client.session: self.streamrip_client.session = await self.streamrip_client.get_session() async with self.streamrip_client.session.get(master_url) as resp: if resp.status != 200: return None playlist_text = await resp.text() # Parse HLS master playlist for variant streams stream_pattern = re.compile( r'#EXT-X-STREAM-INF:.*?BANDWIDTH=(\d+).*?\n([^\n#]+)', re.MULTILINE ) matches = stream_pattern.findall(playlist_text) if matches: # Sort by bandwidth (highest quality = highest bandwidth) matches.sort(key=lambda x: int(x[0]), reverse=True) best_variant = matches[0][1].strip() # If it's a relative URL, make it absolute if not best_variant.startswith('http'): base_url = master_url.rsplit('/', 1)[0] best_variant = f"{base_url}/{best_variant}" logging.info("Selected highest quality variant: bandwidth=%s", matches[0][0]) return best_variant except Exception as e: logging.warning("Failed to parse HLS master playlist: %s", e) return None async def _get_video_manifest_url(self, video_id: str) -> Optional[str]: """Directly fetch the HLS manifest URL from Tidal API. This is a fallback when streamrip's method fails due to format changes. Returns the highest quality variant URL from the HLS master playlist. """ import base64 import re params = { "videoquality": "HIGH", "playbackmode": "STREAM", "assetpresentation": "FULL", } # Ensure we have a session if not hasattr(self.streamrip_client, 'session') or not self.streamrip_client.session: self.streamrip_client.session = await self.streamrip_client.get_session() # Make the API request resp = await self.streamrip_client._api_request( f"videos/{video_id}/playbackinfopostpaywall", params=params ) if not resp or "manifest" not in resp: return None # Decode the manifest manifest_data = json.loads(base64.b64decode(resp["manifest"]).decode("utf-8")) # The manifest should contain URLs - get the master playlist URL urls = manifest_data.get("urls", []) if not urls: return None master_url = urls[0] # Try to fetch the master playlist and find the highest quality variant try: async with self.streamrip_client.session.get(master_url) as resp: if resp.status == 200: playlist_text = await resp.text() # Parse HLS master playlist for variant streams # Look for lines like: #EXT-X-STREAM-INF:BANDWIDTH=...,RESOLUTION=1920x1080 # followed by the variant URL stream_pattern = re.compile( r'#EXT-X-STREAM-INF:.*?BANDWIDTH=(\d+).*?\n([^\n#]+)', re.MULTILINE ) matches = stream_pattern.findall(playlist_text) if matches: # Sort by bandwidth (highest quality = highest bandwidth) matches.sort(key=lambda x: int(x[0]), reverse=True) best_variant = matches[0][1].strip() # If it's a relative URL, make it absolute if not best_variant.startswith('http'): base_url = master_url.rsplit('/', 1)[0] best_variant = f"{base_url}/{best_variant}" logging.info("Selected highest quality video variant: bandwidth=%s", matches[0][0]) return best_variant except Exception as e: logging.warning("Failed to parse HLS master playlist: %s", e) # Fall back to returning the master URL (ffmpeg will pick a variant) return master_url async def download_video(self, video_id: int, output_path: Optional[str] = None) -> Optional[str]: """Download a video by ID. Args: video_id: The Tidal video ID. output_path: Optional path to save the video. Can be a directory or full file path. If not provided, a temp path is used. Returns: The path to the downloaded video file, or None on failure. """ try: logging.info("VIDEO %s: Getting stream URL...", video_id) video_url = await self.get_video_stream_url(video_id) if not video_url: logging.warning("No video URL for video ID: %s", video_id) return None logging.info("VIDEO %s: Got stream URL, preparing download...", video_id) # Determine output path if not output_path: unique = hashlib.sha256(uuid4().bytes).hexdigest()[:16] dl_folder_path = f"{self.streamrip_config.session.downloads.folder}/{unique}" try: os.makedirs(dl_folder_path, exist_ok=True) except Exception: pass output_path = f"{dl_folder_path}/{video_id}.mp4" elif os.path.isdir(output_path): # If output_path is a directory, append the video filename output_path = os.path.join(output_path, f"{video_id}.mp4") # Video URLs are HLS manifests - use ffmpeg to download logging.info("VIDEO %s: Starting ffmpeg HLS download to %s", video_id, output_path) print(f"VIDEO {video_id}: Starting ffmpeg download...") cmd = [ "ffmpeg", "-nostdin", # Don't read from stdin - prevents SIGTTIN in background "-hide_banner", "-loglevel", "warning", "-analyzeduration", "10M", "-probesize", "10M", "-i", video_url, "-c:v", "copy", "-c:a", "aac", "-b:a", "256k", "-af", "aresample=async=1:first_pts=0", "-y", output_path, ] proc = await asyncio.create_subprocess_exec( *cmd, stdin=asyncio.subprocess.DEVNULL, stdout=asyncio.subprocess.DEVNULL, stderr=asyncio.subprocess.PIPE, ) # Use communicate() to avoid buffer deadlocks _, stderr = await proc.communicate() if proc.returncode != 0: stderr_text = stderr.decode().strip() if stderr else "Unknown error" logging.error("ffmpeg video download failed for %s: %s", video_id, stderr_text) return None print(f"VIDEO {video_id}: ffmpeg completed, verifying file...") if not os.path.exists(output_path) or os.path.getsize(output_path) == 0: logging.error("Video download completed but file missing or empty") return None # Verify the MP4 is valid (has moov atom) verify_cmd = [ "ffprobe", "-v", "error", "-show_entries", "format=duration", "-of", "default=noprint_wrappers=1:nokey=1", output_path, ] verify_proc = await asyncio.create_subprocess_exec( *verify_cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, ) verify_stdout, verify_stderr = await verify_proc.communicate() if verify_proc.returncode != 0: stderr_text = verify_stderr.decode().strip() if verify_stderr else "" logging.error("Downloaded video is corrupt (moov atom missing?): %s", stderr_text) # Clean up corrupt file try: os.remove(output_path) except Exception: pass return None duration = verify_stdout.decode().strip() if verify_stdout else "unknown" logging.info("Video %s downloaded to %s (%d bytes, duration: %ss)", video_id, output_path, os.path.getsize(output_path), duration) return output_path except Exception as e: logging.critical("Video download error for %s: %s", video_id, e) return None return None # Should not reach here, but satisfy type checker async def get_videos_by_artist_id(self, artist_id: int, limit: int = 50) -> Optional[list[dict]]: """Get videos by artist ID. Args: artist_id: The Tidal artist ID. limit: Maximum number of videos to return. Returns: List of videos by the artist or None if not found. """ artist_id_str = str(artist_id) # Ensure we have a session if not hasattr(self.streamrip_client, 'session') or not self.streamrip_client.session: self.streamrip_client.session = await self.streamrip_client.get_session() try: # Use the direct Tidal API endpoint for artist videos resp = await self._safe_api_call( self.streamrip_client._api_request, f"artists/{artist_id_str}/videos", params={"limit": limit, "offset": 0}, retries=3, ) except Exception as e: logging.warning("get_videos_by_artist_id API call failed: %s", e) return None if not resp: return None # The response has an "items" array videos = resp.get("items", []) if not videos: return None videos_out = [] for video in videos: artist_info = video.get("artist") or (video.get("artists", [{}])[0] if video.get("artists") else {}) artist_name = artist_info.get("name", "Unknown Artist") if isinstance(artist_info, dict) else str(artist_info) videos_out.append({ "id": video.get("id"), "title": video.get("title"), "artist": artist_name, "duration": video.get("duration"), "duration_formatted": self.format_duration(video.get("duration")), "release_date": video.get("releaseDate"), "image_id": video.get("imageId"), "image_url": ( f"https://resources.tidal.com/images/{video.get('imageId').replace('-', '/')}/640x360.jpg" if video.get("imageId") else None ), }) return videos_out return videos_out async def get_lrc_by_track_id(self, track_id: int) -> Optional[str]: """Get LRC lyrics by track ID.""" logging.debug(f"SR: Fetching metadata for track ID {track_id}") metadata = await self.get_metadata_by_track_id(track_id) lrc = metadata.get("lyrics") if metadata else None logging.debug(f"SR: LRC {'found' if lrc else 'not found'}") return lrc async def get_lrc_by_artist_song( self, artist: str, song: str, album: Optional[str] = None, duration: Optional[int] = None, ) -> Optional[str]: """Get LRC lyrics by artist and song, optionally filtering by album and duration.""" logging.debug(f"SR: Searching tracks for {artist} - {song}") tracks = await self.get_tracks_by_artist_song(artist, song) logging.info(f"SR: Found {len(tracks) if tracks else 0} tracks") if not tracks: return None # Filter by album if provided if album: tracks = [ t for t in tracks if t.get("album", {}).get("title", "").lower() == album.lower() ] if not tracks: return None # Prefer exact title matches first (highest confidence) exact_title_matches = [] for t in tracks: found_title = t.get("title") if found_title and found_title.strip().lower() == song.strip().lower(): exact_title_matches.append(t) if exact_title_matches: logging.info(f"SR: {len(exact_title_matches)} exact title matches found") tracks = exact_title_matches else: # Prefer tracks that match artist/title fuzzily filtered_by_metadata = [] for t in tracks: found_artist = ( t.get("artist", {}).get("name") if isinstance(t.get("artist"), dict) else t.get("artist") ) found_album = ( t.get("album", {}).get("title") if t.get("album") else None ) found_title = t.get("title") try: if self.is_metadata_match( artist, album, song, found_artist, found_album, found_title ): filtered_by_metadata.append(t) except Exception: # On any error, skip strict metadata matching for this candidate continue if filtered_by_metadata: logging.info( f"SR: {len(filtered_by_metadata)} candidates after metadata filtering" ) tracks = filtered_by_metadata else: logging.info( "SR: No candidates passed metadata match filter; falling back to search results" ) # If duration provided, select the track with closest duration match if duration is not None: tracks_with_diff = [ (t, abs(t.get("duration", 0) - duration)) for t in tracks ] tracks_with_diff.sort(key=lambda x: x[1]) best_track, min_diff = tracks_with_diff[0] logging.info(f"SR: Best match duration diff: {min_diff}s") # If the closest match is more than 10 seconds off, consider no match if min_diff > 10: logging.info("SR: Duration diff too large, no match") return None else: best_track = tracks[0] track_id = best_track.get("id") # Ensure the selected candidate reasonably matches expected metadata selected_artist = ( best_track.get("artist", {}).get("name") if isinstance(best_track.get("artist"), dict) else best_track.get("artist") ) selected_title = best_track.get("title") if not self.is_metadata_match( artist, album, song, selected_artist, ( best_track.get("album", {}).get("title") if best_track.get("album") else None ), selected_title, ): # Try to find another candidate that does match metadata logging.warning( "SR: Selected candidate failed metadata check: id=%s artist=%s title=%s; searching for better match", track_id, selected_artist, selected_title, ) found_better = None for candidate in tracks: cand_artist = ( candidate.get("artist", {}).get("name") if isinstance(candidate.get("artist"), dict) else candidate.get("artist") ) cand_title = candidate.get("title") if self.is_metadata_match( artist, album, song, cand_artist, ( candidate.get("album", {}).get("title") if candidate.get("album") else None ), cand_title, ): found_better = candidate break if found_better: logging.warning( "SR: Switching to better candidate id=%s artist=%s title=%s", found_better.get("id"), ( found_better.get("artist", {}).get("name") if isinstance(found_better.get("artist"), dict) else found_better.get("artist") ), found_better.get("title"), ) best_track = found_better track_id = best_track.get("id") else: # No matching candidate passed metadata checks; log candidates and abort logging.warning( "SR: No candidates passed metadata checks for %s - %s; candidates: %s", artist, song, [ { "id": t.get("id"), "artist": ( t.get("artist", {}).get("name") if isinstance(t.get("artist"), dict) else t.get("artist") ), "title": t.get("title"), "duration": t.get("duration"), } for t in tracks[:10] ], ) return None logging.info( f"SR: Using track ID {track_id} (artist={best_track.get('artist')}, title={best_track.get('title')})" ) if not track_id: return None return await self.get_lrc_by_track_id(track_id)