from typing import Optional from uuid import uuid4 from urllib.parse import urlparse import hashlib import logging import os import asyncio import aiohttp from streamrip.client import TidalClient # type: ignore from streamrip.config import Config as StreamripConfig # type: ignore from dotenv import load_dotenv load_dotenv() class SRUtil: """ StreamRip API Utility Class """ def __init__(self) -> None: """Initialize StreamRip utility.""" self.streamrip_config = StreamripConfig.defaults() self.streamrip_config.session.tidal.user_id = os.getenv("tidal_user_id", "") self.streamrip_config.session.tidal.access_token = os.getenv( "tidal_access_token", "" ) self.streamrip_config.session.tidal.refresh_token = os.getenv( "tidal_refresh_token", "" ) self.streamrip_config.session.tidal.token_expiry = os.getenv( "tidal_token_expiry", "" ) self.streamrip_config.session.tidal.country_code = os.getenv( "tidal_country_code", "" ) self.streamrip_config.session.tidal.quality = int( os.getenv("tidal_default_quality", 2) ) self.streamrip_config.session.conversion.enabled = False self.streamrip_config.session.downloads.folder = os.getenv( "tidal_download_folder", "" ) self.streamrip_config self.streamrip_client = TidalClient(self.streamrip_config) def dedupe_by_key(self, key: str, entries: list[dict]) -> list[dict]: deduped = {} for entry in entries: norm = entry[key].strip().lower() if norm not in deduped: deduped[norm] = entry return list(deduped.values()) def format_duration(self, seconds): if not seconds: return None m, s = divmod(seconds, 60) return f"{m}:{s:02}" async def get_artists_by_name(self, artist_name: str) -> Optional[list]: """Get artist(s) by name. Args: artist_name (str): The name of the artist. Returns: Optional[dict]: The artist details or None if not found. """ try: await self.streamrip_client.login() except Exception as e: logging.info("Login Exception: %s", str(e)) pass artists_out: list[dict] = [] try: artists = await self.streamrip_client.search( media_type="artist", query=artist_name ) except AttributeError: await self.streamrip_client.login() artists = await self.streamrip_client.search( media_type="artist", query=artist_name ) logging.critical("Artists output: %s", artists) artists = artists[0].get("items", []) if not artists: logging.warning("No artist found for name: %s", artist_name) return None artists_out = [ { "artist": res["name"], "id": res["id"], } for res in artists if "name" in res and "id" in res ] artists_out = self.dedupe_by_key("artist", artists_out) # Remove duplicates return artists_out async def get_albums_by_artist_id(self, artist_id: int) -> Optional[list | dict]: """Get albums by artist ID Args: artist_id (int): The ID of the artist. Returns: Optional[list[dict]]: List of albums or None if not found. """ artist_id_str: str = str(artist_id) albums_out: list[dict] = [] try: await self.streamrip_client.login() metadata = await self.streamrip_client.get_metadata( item_id=artist_id_str, media_type="artist" ) except AttributeError: await self.streamrip_client.login() metadata = await self.streamrip_client.get_metadata( item_id=artist_id_str, media_type="artist" ) if not metadata: logging.warning("No metadata found for artist ID: %s", artist_id) return None albums = self.dedupe_by_key("title", metadata.get("albums", [])) albums_out = [ { "artist": ", ".join(artist["name"] for artist in album["artists"]), "album": album["title"], "id": album["id"], "release_date": album.get("releaseDate", "Unknown"), } for album in albums if "title" in album and "id" in album and "artists" in album ] logging.debug("Retrieved albums: %s", albums_out) return albums_out async def get_tracks_by_album_id(self, album_id: int) -> Optional[list | dict]: """Get tracks by album ID Args: album_id (int): The ID of the album. Returns: Optional[list[dict]]: List of tracks or None if not found. """ album_id_str = str(album_id) await self.streamrip_client.login() metadata = await self.streamrip_client.get_metadata( item_id=album_id_str, media_type="album" ) if not metadata: logging.warning("No metadata found for album ID: %s", album_id) return None track_list = metadata.get("tracks", []) tracks_out: list[dict] = [ { "id": track.get("id"), "artist": track.get("artist").get("name"), "title": track.get("title"), "duration": self.format_duration(track.get("duration", 0)), "version": track.get("version"), "audioQuality": track.get("audioQuality"), } for track in track_list ] return tracks_out async def get_tracks_by_artist_song(self, artist: str, song: str) -> Optional[list]: """Get track by artist and song name Args: artist (str): The name of the artist. song (str): The name of the song. Returns: Optional[dict]: The track details or None if not found. TODO: Reimplement using StreamRip """ return [] async def get_stream_url_by_track_id( self, track_id: int, quality: str = "LOSSLESS" ) -> Optional[str]: """Get stream URL by track ID Args: track_id (int): The ID of the track. quality (str): The quality of the stream, default is "LOSSLESS". Other options: HIGH, LOW Returns: Optional[str]: The stream URL or None if not found. """ if quality not in ["LOSSLESS", "HIGH", "LOW"]: logging.error("Invalid quality requested: %s", quality) quality_int: int = int(self.streamrip_config.session.tidal.quality) match quality: case "HIGH": quality_int = 1 case "LOW": quality_int = 0 track_id_str: str = str(track_id) await self.streamrip_client.login() try: track = await self.streamrip_client.get_downloadable( track_id=track_id_str, quality=quality_int ) except AttributeError: await self.streamrip_client.login() track = await self.streamrip_client.get_downloadable( track_id=track_id_str, quality=quality_int ) if not track: logging.warning("No track found for ID: %s", track_id) return None stream_url = track.url if not stream_url: logging.warning("No stream URL found for track ID: %s", track_id) return None return stream_url async def get_metadata_by_track_id(self, track_id: int) -> Optional[dict]: try: await self.streamrip_client.login() metadata = await self.streamrip_client.get_metadata(str(track_id), "track") return { "artist": metadata.get("artist", {}).get("name", "Unknown Artist"), "album": metadata.get("album", {}).get("title", "Unknown Album"), "song": metadata.get("title", uuid4()), } except Exception as e: logging.critical( "Get metadata for %s failed, Exception: %s", track_id, str(e) ) return None async def download(self, track_id: int, quality: str = "LOSSLESS") -> bool | str: """Download track Args: track_id (int) quality (str): The quality of the stream, default is "LOSSLESS". Other options: HIGH, LOW Returns: bool """ try: await self.streamrip_client.login() track_url = await self.get_stream_url_by_track_id(track_id) if not track_url: return False parsed_url = urlparse(track_url) parsed_url_filename = os.path.basename(parsed_url.path) parsed_url_ext = os.path.splitext(parsed_url_filename)[1] unique = hashlib.sha256(uuid4().bytes).hexdigest()[:16] dl_folder_path = ( f"{self.streamrip_config.session.downloads.folder}/{unique}" ) dl_path = f"{dl_folder_path}/{track_id}.{parsed_url_ext}" async with aiohttp.ClientSession() as session: async with session.get( track_url, headers={}, timeout=aiohttp.ClientTimeout(total=60) ) as resp: resp.raise_for_status() with open(dl_path, "wb") as f: async for chunk in resp.content.iter_chunked(1024 * 64): f.write(chunk) return dl_path except Exception as e: logging.critical("Error: %s", str(e)) return False