from typing import Optional import logging import os from streamrip.client import TidalClient from streamrip.config import Config as StreamripConfig from dotenv import load_dotenv load_dotenv() class SRUtil: """ StreamRip API Utility Class """ def __init__(self) -> None: """Initialize StreamRip utility.""" self.streamrip_config = StreamripConfig.defaults() self.streamrip_config.session.tidal.user_id = os.getenv("tidal_user_id", "") self.streamrip_config.session.tidal.access_token = os.getenv( "tidal_access_token", "" ) self.streamrip_config.session.tidal.refresh_token = os.getenv( "tidal_refresh_token", "" ) self.streamrip_config.session.tidal.token_expiry = os.getenv( "tidal_token_expiry", "" ) self.streamrip_config.session.tidal.country_code = os.getenv( "tidal_country_code", "" ) self.streamrip_config.session.tidal.quality = int( os.getenv("tidal_default_quality", 2) ) self.streamrip_config.session.conversion.enabled = False self.streamrip_config.session.downloads.folder = os.getenv( "tidal_download_folder", "" ) self.streamrip_config self.streamrip_client = TidalClient(self.streamrip_config) def dedupe_by_key(self, key: str, entries: list[dict]) -> list[dict]: deduped = {} for entry in entries: norm = entry[key].strip().lower() if norm not in deduped: deduped[norm] = entry return list(deduped.values()) def format_duration(self, seconds): if not seconds: return None m, s = divmod(seconds, 60) return f"{m}:{s:02}" async def get_artists_by_name(self, artist_name: str) -> Optional[list]: """Get artist(s) by name. Args: artist_name (str): The name of the artist. Returns: Optional[dict]: The artist details or None if not found. """ if not self.streamrip_client.logged_in: await self.streamrip_client.login() artists_out: list[dict] = [] try: artists = await self.streamrip_client.search( media_type="artist", query=artist_name ) except AttributeError: await self.streamrip_client.login() artists = await self.streamrip_client.search( media_type="artist", query=artist_name ) artists = artists[0].get("items", []) if not artists: logging.warning("No artist found for name: %s", artist_name) return None artists_out = [ { "artist": res["name"], "id": res["id"], } for res in artists if "name" in res and "id" in res ] artists_out = self.dedupe_by_key("artist", artists_out) # Remove duplicates return artists_out async def get_albums_by_artist_id(self, artist_id: int) -> Optional[list | dict]: """Get albums by artist ID Args: artist_id (int): The ID of the artist. Returns: Optional[list[dict]]: List of albums or None if not found. """ artist_id_str: str = str(artist_id) albums_out: list[dict] = [] try: if not self.streamrip_client.logged_in: await self.streamrip_client.login() metadata = await self.streamrip_client.get_metadata( item_id=artist_id_str, media_type="artist" ) except AttributeError: await self.streamrip_client.login() metadata = await self.streamrip_client.get_metadata( item_id=artist_id_str, media_type="artist" ) if not metadata: logging.warning("No metadata found for artist ID: %s", artist_id) return None albums = metadata.get("albums", []) albums_out = [ { "artist": ", ".join(artist["name"] for artist in album["artists"]), "album": album["title"], "id": album["id"], "release_date": album.get("releaseDate", "Unknown"), } for album in albums if "title" in album and "id" in album and "artists" in album ] logging.info("Retrieved albums: %s", albums_out) return albums_out async def get_tracks_by_album_id(self, album_id: int) -> Optional[list | dict]: """Get tracks by album ID Args: album_id (int): The ID of the album. Returns: Optional[list[dict]]: List of tracks or None if not found. """ album_id_str = str(album_id) if not self.streamrip_client.logged_in: await self.streamrip_client.login() metadata = await self.streamrip_client.get_metadata( item_id=album_id_str, media_type="album" ) if not metadata: logging.warning("No metadata found for album ID: %s", album_id) return None track_list = metadata.get("tracks", []) tracks_out: list[dict] = [ { "id": track.get("id"), "artist": track.get("artist").get("name"), "title": track.get("title"), "duration": self.format_duration(track.get("duration", 0)), "version": track.get("version"), "audioQuality": track.get("audioQuality"), } for track in track_list ] return tracks_out async def get_tracks_by_artist_song(self, artist: str, song: str) -> Optional[list]: """Get track by artist and song name Args: artist (str): The name of the artist. song (str): The name of the song. Returns: Optional[dict]: The track details or None if not found. TODO: Reimplement using StreamRip """ return [] async def get_stream_url_by_track_id( self, track_id: int, quality: str = "LOSSLESS" ) -> Optional[str]: """Get stream URL by track ID Args: track_id (int): The ID of the track. quality (str): The quality of the stream, default is "LOSSLESS". Other options: HIGH, LOW Returns: Optional[str]: The stream URL or None if not found. """ track_id_str = str(track_id) track = await self.streamrip_client.get_downloadable( track_id=track_id_str, quality=self.streamrip_config.session.tidal.quality ) if not track: logging.warning("No track found for ID: %s", track_id) return None stream_url = track.url if not stream_url: logging.warning("No stream URL found for track ID: %s", track_id) return None return stream_url