from aiohttp import ClientSession, ClientTimeout from typing import Optional from urllib.parse import urlencode, quote import logging import os import asyncio from streamrip.client import TidalClient from streamrip.config import Config as StreamripConfig from dotenv import load_dotenv load_dotenv() class SRUtil: """ StreamRip API Utility Class """ def __init__(self) -> None: """Initialize StreamRip utility.""" self.streamrip_config = StreamripConfig.defaults() self.streamrip_config.session.tidal.user_id = os.getenv("tidal_user_id", "") self.streamrip_config.session.tidal.access_token = os.getenv( "tidal_access_token", "" ) self.streamrip_config.session.tidal.refresh_token = os.getenv( "tidal_refresh_token", "" ) self.streamrip_config.session.tidal.token_expiry = os.getenv( "tidal_token_expiry", "" ) self.streamrip_config.session.tidal.country_code = os.getenv( "tidal_country_code", "" ) self.streamrip_config.session.tidal.quality = int( os.getenv("tidal_default_quality", 2) ) self.streamrip_config.session.conversion.enabled = False self.streamrip_config.session.downloads.folder = os.getenv( "tidal_download_folder", "" ) self.streamrip_config self.streamrip_client = TidalClient(self.streamrip_config) def dedupe_by_key(self, key: str, entries: list[dict]) -> list[dict]: deduped = {} for entry in entries: norm = entry[key].strip().lower() if norm not in deduped: deduped[norm] = entry return list(deduped.values()) def format_duration(self, seconds): if not seconds: return None m, s = divmod(seconds, 60) return f"{m}:{s:02}" async def get_artists_by_name(self, artist_name: str) -> Optional[list]: """Get artist(s) by name from HiFi API. Args: artist_name (str): The name of the artist. Returns: Optional[dict]: The artist details or None if not found. """ if not self.streamrip_client.logged_in: await self.streamrip_client.login() artists_out: list[dict] = [] try: artists = await self.streamrip_client.search( media_type="artist", query=artist_name ) except AttributeError: await self.streamrip_client.login() artists = await self.streamrip_client.search( media_type="artist", query=artist_name ) artists = artists[0].get("items", []) if not artists: logging.warning("No artist found for name: %s", artist_name) return None artists_out = [ { "artist": res["name"], "id": res["id"], } for res in artists if "name" in res and "id" in res ] artists_out = self.dedupe_by_key("artist", artists_out) # Remove duplicates return artists_out async def get_albums_by_artist_id(self, artist_id: int) -> Optional[list | dict]: """Get albums by artist ID from HiFi API. Args: artist_id (int): The ID of the artist. Returns: Optional[list[dict]]: List of albums or None if not found. """ artist_id_str: str = str(artist_id) albums_out: list[dict] = [] try: if not self.streamrip_client.logged_in: await self.streamrip_client.login() metadata = await self.streamrip_client.get_metadata( item_id=artist_id_str, media_type="artist" ) except AttributeError: await self.streamrip_client.login() metadata = await self.streamrip_client.get_metadata( item_id=artist_id_str, media_type="artist" ) if not metadata: logging.warning("No metadata found for artist ID: %s", artist_id) return None albums = metadata.get("albums", []) albums_out = [ { "artist": ", ".join(artist["name"] for artist in album["artists"]), "album": album["title"], "id": album["id"], "release_date": album.get("releaseDate", "Unknown"), } for album in albums if "title" in album and "id" in album and "artists" in album ] logging.info("Retrieved albums: %s", albums_out) return albums_out async def get_tracks_by_album_id(self, album_id: int) -> Optional[list | dict]: """Get tracks by album ID from HiFi API. Args: album_id (int): The ID of the album. Returns: Optional[list[dict]]: List of tracks or None if not found. """ album_id_str = str(album_id) if not self.streamrip_client.logged_in: await self.streamrip_client.login() metadata = await self.streamrip_client.get_metadata( item_id=album_id_str, media_type="album" ) if not metadata: logging.warning("No metadata found for album ID: %s", album_id) return None track_list = metadata.get("tracks", []) tracks_out: list[dict] = [ { "id": track.get("id"), "artist": track.get("artist").get("name"), "title": track.get("title"), "duration": self.format_duration(track.get("duration", 0)), "version": track.get("version"), "audioQuality": track.get("audioQuality"), } for track in track_list ] return tracks_out async def get_tracks_by_artist_song(self, artist: str, song: str) -> Optional[list]: """Get track by artist and song name from HiFi API. Args: artist (str): The name of the artist. song (str): The name of the song. Returns: Optional[dict]: The track details or None if not found. TODO: Reimplement using StreamRip """ return [] async def get_stream_url_by_track_id( self, track_id: int, quality: str = "LOSSLESS" ) -> Optional[str]: """Get stream URL by track ID from HiFi API. Args: track_id (int): The ID of the track. quality (str): The quality of the stream, default is "LOSSLESS". Other options: HIGH, LOW Returns: Optional[str]: The stream URL or None if not found. """ track_id_str = str(track_id) track = await self.streamrip_client.get_downloadable( track_id=track_id_str, quality=self.streamrip_config.session.tidal.quality ) if not track: logging.warning("No track found for ID: %s", track_id) return None stream_url = track.url if not stream_url: logging.warning("No stream URL found for track ID: %s", track_id) return None return stream_url