from typing import Optional, Any from uuid import uuid4 from urllib.parse import urlparse import hashlib import logging import random import asyncio import os import aiohttp import time from streamrip.client import TidalClient # type: ignore from streamrip.config import Config as StreamripConfig # type: ignore from dotenv import load_dotenv load_dotenv() class SRUtil: """ StreamRip API Utility Class """ def __init__(self) -> None: """Initialize StreamRip utility.""" self.streamrip_config = StreamripConfig.defaults() self.streamrip_config.session.tidal.user_id = os.getenv("tidal_user_id", "") self.streamrip_config.session.tidal.access_token = os.getenv( "tidal_access_token", "" ) self.streamrip_config.session.tidal.refresh_token = os.getenv( "tidal_refresh_token", "" ) self.streamrip_config.session.tidal.token_expiry = os.getenv( "tidal_token_expiry", "" ) self.streamrip_config.session.tidal.country_code = os.getenv( "tidal_country_code", "" ) self.streamrip_config.session.tidal.quality = int( os.getenv("tidal_default_quality", 2) ) self.streamrip_config.session.conversion.enabled = False self.streamrip_config.session.downloads.folder = os.getenv( "tidal_download_folder", "" ) self.streamrip_config self.streamrip_client = TidalClient(self.streamrip_config) self.MAX_CONCURRENT_METADATA_REQUESTS = 2 self.METADATA_RATE_LIMIT = 1.25 self.METADATA_SEMAPHORE = asyncio.Semaphore(self.MAX_CONCURRENT_METADATA_REQUESTS) self.LAST_METADATA_REQUEST = 0 self.MAX_METADATA_RETRIES = 5 self.METADATA_ALBUM_CACHE: dict[str, dict] = {} self.RETRY_DELAY = 1.0 # seconds between retries async def rate_limited_request(self, func, *args, **kwargs): async with self.METADATA_SEMAPHORE: now = time.time() elapsed = now - self.LAST_METADATA_REQUEST if elapsed < self.METADATA_RATE_LIMIT: await asyncio.sleep(self.METADATA_RATE_LIMIT - elapsed) result = await func(*args, **kwargs) self.last_request_time = time.time() return result def dedupe_by_key(self, key: str, entries: list[dict]) -> list[dict]: deduped = {} for entry in entries: norm = entry[key].strip().lower() if norm not in deduped: deduped[norm] = entry return list(deduped.values()) def format_duration(self, seconds): if not seconds: return None m, s = divmod(seconds, 60) return f"{m}:{s:02}" def combine_album_track_metadata( self, album_json: dict | None, track_json: dict ) -> dict: """ Combine album-level and track-level metadata into a unified tag dictionary. Track-level metadata overrides album-level where relevant. """ album_json = album_json or {} # Album-level combined = { "album": album_json.get("title"), "album_artist": album_json.get("artist", {}).get("name"), "release_date": album_json.get("releaseDate"), "album_type": album_json.get("type"), "total_tracks": album_json.get("numberOfTracks"), "upc": album_json.get("upc"), "album_copyright": album_json.get("copyright"), "album_cover_id": album_json.get("cover"), "album_cover_url": f"https://resources.tidal.com/images/{album_json.get('cover')}/1280x1280.jpg" if album_json.get("cover") else None, } # Track-level (overrides or adds to album info) combined.update( { "title": track_json.get("title"), "artist": track_json.get("artist", {}).get("name"), "artists": [a.get("name") for a in track_json.get("artists", [])], "track_number": track_json.get("trackNumber"), "disc_number": track_json.get("volumeNumber"), "duration": track_json.get("duration"), "isrc": track_json.get("isrc"), "bpm": track_json.get("bpm"), "explicit": track_json.get("explicit"), "replaygain": track_json.get("replayGain"), "peak": track_json.get("peak"), "lyrics": track_json.get("lyrics"), "track_copyright": track_json.get("copyright"), "cover_id": track_json.get("album", {}).get("cover") or album_json.get("cover"), "cover_url": ( f"https://resources.tidal.com/images/{track_json.get('album', {}).get('cover', album_json.get('cover'))}/1280x1280.jpg" if (track_json.get("album", {}).get("cover") or album_json.get("cover")) else None ), } ) return combined def combine_album_with_all_tracks( self, album_json: dict[str, Any] ) -> list[dict[str, Any]]: """Return a list of combined metadata dicts for all tracks in an album JSON.""" return [ self.combine_album_track_metadata(album_json, t) for t in album_json.get("tracks", []) ] async def get_artists_by_name(self, artist_name: str) -> Optional[list]: """Get artist(s) by name. Args: artist_name (str): The name of the artist. Returns: Optional[dict]: The artist details or None if not found. """ try: await self.streamrip_client.login() except Exception as e: logging.info("Login Exception: %s", str(e)) pass artists_out: list[dict] = [] try: artists = await self.streamrip_client.search( media_type="artist", query=artist_name ) except AttributeError: await self.streamrip_client.login() artists = await self.streamrip_client.search( media_type="artist", query=artist_name ) logging.critical("Artists output: %s", artists) artists = artists[0].get("items", []) if not artists: logging.warning("No artist found for name: %s", artist_name) return None artists_out = [ { "artist": res["name"], "id": res["id"], } for res in artists if "name" in res and "id" in res ] artists_out = self.dedupe_by_key("artist", artists_out) # Remove duplicates return artists_out async def get_albums_by_artist_id(self, artist_id: int) -> Optional[list | dict]: """Get albums by artist ID Args: artist_id (int): The ID of the artist. Returns: Optional[list[dict]]: List of albums or None if not found. """ artist_id_str: str = str(artist_id) albums_out: list[dict] = [] try: await self.streamrip_client.login() metadata = await self.streamrip_client.get_metadata( item_id=artist_id_str, media_type="artist" ) except AttributeError: await self.streamrip_client.login() metadata = await self.streamrip_client.get_metadata( item_id=artist_id_str, media_type="artist" ) if not metadata: logging.warning("No metadata found for artist ID: %s", artist_id) return None albums = self.dedupe_by_key("title", metadata.get("albums", [])) albums_out = [ { "artist": ", ".join(artist["name"] for artist in album["artists"]), "album": album["title"], "id": album["id"], "release_date": album.get("releaseDate", "Unknown"), } for album in albums if "title" in album and "id" in album and "artists" in album ] logging.debug("Retrieved albums: %s", albums_out) return albums_out async def get_tracks_by_album_id( self, album_id: int, quality: str = "FLAC" ) -> Optional[list | dict]: """Get tracks by album ID Args: album_id (int): The ID of the album. Returns: Optional[list[dict]]: List of tracks or None if not found. """ album_id_str = str(album_id) await self.streamrip_client.login() metadata = await self.streamrip_client.get_metadata( item_id=album_id_str, media_type="album" ) if not metadata: logging.warning("No metadata found for album ID: %s", album_id) return None track_list = metadata.get("tracks", []) tracks_out: list[dict] = [ { "id": track.get("id"), "artist": track.get("artist").get("name"), "title": track.get("title"), "duration": self.format_duration(track.get("duration", 0)), "version": track.get("version"), "audioQuality": track.get("audioQuality"), } for track in track_list ] return tracks_out async def get_tracks_by_artist_song(self, artist: str, song: str) -> Optional[list]: """Get track by artist and song name Args: artist (str): The name of the artist. song (str): The name of the song. Returns: Optional[dict]: The track details or None if not found. TODO: Reimplement using StreamRip """ return [] async def get_stream_url_by_track_id( self, track_id: int, quality: str = "FLAC" ) -> Optional[str]: """Get stream URL by track ID Args: track_id (int): The ID of the track. quality (str): The quality of the stream, default is "LOSSLESS". Other options: HIGH, LOW Returns: Optional[str]: The stream URL or None if not found. """ if quality not in ["FLAC", "Lossy"]: logging.error("Invalid quality requested: %s", quality) return None quality_int: int = int(self.streamrip_config.session.tidal.quality) match quality: case "FLAC": quality_int = 2 case "Lossy": quality_int = 1 track_id_str: str = str(track_id) await self.streamrip_client.login() try: logging.critical("Using quality_int: %s", quality_int) track = await self.streamrip_client.get_downloadable( track_id=track_id_str, quality=quality_int ) except AttributeError: await self.streamrip_client.login() track = await self.streamrip_client.get_downloadable( track_id=track_id_str, quality=quality_int ) if not track: logging.warning("No track found for ID: %s", track_id) return None stream_url = track.url if not stream_url: logging.warning("No stream URL found for track ID: %s", track_id) return None return stream_url async def get_metadata_by_track_id(self, track_id: int) -> Optional[dict]: """ Fetch track + album metadata with retries, caching album data. Returns combined metadata dict or None after exhausting retries. """ for attempt in range(1, self.MAX_METADATA_RETRIES + 1): try: await self.streamrip_client.login() # Track metadata metadata = await self.rate_limited_request( self.streamrip_client.get_metadata, str(track_id), "track" ) album_id = metadata.get("album", {}).get("id") album_metadata = None if album_id: # Check cache first if album_id in self.METADATA_ALBUM_CACHE: album_metadata = self.METADATA_ALBUM_CACHE[album_id] else: album_metadata = await self.rate_limited_request( self.streamrip_client.get_metadata, album_id, "album" ) if not album_metadata: return None self.METADATA_ALBUM_CACHE[album_id] = album_metadata # Combine track + album metadata if not album_metadata: return None combined_metadata: dict = self.combine_album_track_metadata( album_metadata, metadata ) logging.info( "Combined metadata for track ID %s (attempt %d): %s", track_id, attempt, combined_metadata, ) return combined_metadata except Exception as e: # Exponential backoff with jitter for 429 or other errors delay = self.RETRY_DELAY * (2 ** (attempt - 1)) + random.uniform(0, 0.5) logging.warning( "Metadata fetch failed for track %s (attempt %d/%d): %s. Retrying in %.2fs", track_id, attempt, self.MAX_METADATA_RETRIES, str(e), delay, ) if attempt < self.MAX_METADATA_RETRIES: await asyncio.sleep(delay) else: logging.error( "Metadata fetch failed permanently for track %s after %d attempts", track_id, self.MAX_METADATA_RETRIES, ) return None async def download(self, track_id: int, quality: str = "LOSSLESS") -> bool | str: """Download track Args: track_id (int) quality (str): The quality of the stream, default is "LOSSLESS". Other options: HIGH, LOW Returns: bool """ try: await self.streamrip_client.login() track_url = await self.get_stream_url_by_track_id(track_id) if not track_url: return False parsed_url = urlparse(track_url) parsed_url_filename = os.path.basename(parsed_url.path) parsed_url_ext = os.path.splitext(parsed_url_filename)[1] unique = hashlib.sha256(uuid4().bytes).hexdigest()[:16] dl_folder_path = ( f"{self.streamrip_config.session.downloads.folder}/{unique}" ) dl_path = f"{dl_folder_path}/{track_id}.{parsed_url_ext}" async with aiohttp.ClientSession() as session: async with session.get( track_url, headers={}, timeout=aiohttp.ClientTimeout(total=60) ) as resp: resp.raise_for_status() with open(dl_path, "wb") as f: async for chunk in resp.content.iter_chunked(1024 * 64): f.write(chunk) return dl_path except Exception as e: logging.critical("Error: %s", str(e)) return False