diff --git a/endpoints/rip.py b/endpoints/rip.py index 00cb337..f683286 100644 --- a/endpoints/rip.py +++ b/endpoints/rip.py @@ -27,9 +27,7 @@ class RIP(FastAPI): } for endpoint, handler in self.endpoints.items(): - dependencies = [ - Depends(RateLimiter(times=8, seconds=2)) - ] + dependencies = [Depends(RateLimiter(times=8, seconds=2))] app.add_api_route( f"/{endpoint}", handler, diff --git a/utils/hifi_wrapper.py b/utils/hifi_wrapper.py index 2acc744..2efc8da 100644 --- a/utils/hifi_wrapper.py +++ b/utils/hifi_wrapper.py @@ -2,17 +2,45 @@ from aiohttp import ClientSession, ClientTimeout from typing import Optional from urllib.parse import urlencode, quote import logging +import os +import asyncio +from streamrip.client import TidalClient +from streamrip.config import Config as StreamripConfig +from dotenv import load_dotenv + +load_dotenv() -class HifiUtil: +class SRUtil: """ - HiFi API Utility Class + StreamRip API Utility Class """ def __init__(self) -> None: - """Initialize HiFi API utility with base URLs.""" - self.hifi_api_url: str = "http://127.0.0.1:8000" - self.hifi_search_url: str = f"{self.hifi_api_url}/search" + """Initialize StreamRip utility.""" + self.streamrip_config = StreamripConfig.defaults() + self.streamrip_config.session.tidal.user_id = os.getenv("tidal_user_id", "") + self.streamrip_config.session.tidal.access_token = os.getenv( + "tidal_access_token", "" + ) + self.streamrip_config.session.tidal.refresh_token = os.getenv( + "tidal_refresh_token", "" + ) + self.streamrip_config.session.tidal.token_expiry = os.getenv( + "tidal_token_expiry", "" + ) + self.streamrip_config.session.tidal.country_code = os.getenv( + "tidal_country_code", "" + ) + self.streamrip_config.session.tidal.quality = int( + os.getenv("tidal_default_quality", 2) + ) + self.streamrip_config.session.conversion.enabled = False + self.streamrip_config.session.downloads.folder = os.getenv( + "tidal_download_folder", "" + ) + self.streamrip_config + self.streamrip_client = TidalClient(self.streamrip_config) def dedupe_by_key(self, key: str, entries: list[dict]) -> list[dict]: deduped = {} @@ -28,109 +56,6 @@ class HifiUtil: m, s = divmod(seconds, 60) return f"{m}:{s:02}" - async def search( - self, - artist: str, - song: str = "", - album: str = "", - video_name: str = "", - playlist_name: str = "", - ) -> Optional[list[dict]]: - """Search HiFi API - Args: - artist (str, required) - song (str, optional) - album (str, optional) - video_name (str, optional) - playlist_name (str, optional) - Returns: - Optional[dict]: Returns the first result from the HiFi API search or None if no results found. - """ - async with ClientSession(timeout=ClientTimeout(total=30)) as session: - params: dict = { - "a": artist, - "s": song, - "al": album, - "v": video_name, - "p": playlist_name, - } - query: str = urlencode(params, quote_via=quote) - built_url: str = f"{self.hifi_search_url}?{query}" - async with session.get(built_url) as response: - json_response: dict = await response.json() - if isinstance(json_response, list): - json_response = json_response[0] - key = next(iter(json_response), None) - if not key: - logging.error("No matching key found in JSON response.") - return None - logging.info("Key: %s", key) - if key not in ["limit"]: - json_response = json_response[key] - items: list[dict] = json_response.get("items", []) - if not items: - logging.info("No results found.") - return None - return items - - async def _retrieve( - self, req_type: str, id: int, quality: str = "LOSSLESS" - ) -> Optional[list | dict]: - """Retrieve a specific item by type and ID from the HiFi API. - Args: - type (str): The type of item (e.g., 'song', 'album'). - id (int): The ID of the item. - quality (str): The quality of the item, default is "LOSSLESS". Other options: HIGH, LOW - Returns: - Optional[dict]: The item details or None if not found. - """ - async with ClientSession(timeout=ClientTimeout(total=10)) as session: - params: dict = { - "id": id, - "quality": quality, - } - if req_type not in ["track", "artist", "album", "playlist", "video"]: - logging.error("Invalid type: %s", type) - return None - if req_type in ["artist"]: - params.pop("id") - params["f"] = ( - id # For non-track types, use 'f' instead of 'id' for full API output - ) - query: str = urlencode(params, quote_via=quote) - built_url: str = f"{self.hifi_api_url}/{req_type}/?{query}" - logging.info("Built URL: %s", built_url) - async with session.get(built_url) as response: - if response.status != 200: - logging.warning("Item not found: %s %s", req_type, id) - return None - response_json: list | dict = await response.json() - match req_type: - case "artist": - response_json = response_json[0] - if not isinstance(response_json, dict): - logging.error( - "Expected a dict but got: %s", type(response_json) - ) - return None - response_json_rows = response_json.get("rows") - if not isinstance(response_json_rows, list): - logging.error( - "Expected a list but got: %s", type(response_json_rows) - ) - return None - response_json = ( - response_json_rows[0].get("modules")[0].get("pagedList") - ) - case "album": - return response_json[1].get("items", []) - case "track": - return response_json - if not isinstance(response_json, dict): - logging.error("Expected a list but got: %s", type(response_json)) - return None - return response_json.get("items") - async def get_artists_by_name(self, artist_name: str) -> Optional[list]: """Get artist(s) by name from HiFi API. Args: @@ -138,8 +63,20 @@ class HifiUtil: Returns: Optional[dict]: The artist details or None if not found. """ + + if not self.streamrip_client.logged_in: + await self.streamrip_client.login() artists_out: list[dict] = [] - artists = await self.search(artist=artist_name) + try: + artists = await self.streamrip_client.search( + media_type="artist", query=artist_name + ) + except AttributeError: + await self.streamrip_client.login() + artists = await self.streamrip_client.search( + media_type="artist", query=artist_name + ) + artists = artists[0].get("items", []) if not artists: logging.warning("No artist found for name: %s", artist_name) return None @@ -161,11 +98,23 @@ class HifiUtil: Returns: Optional[list[dict]]: List of albums or None if not found. """ + artist_id_str: str = str(artist_id) albums_out: list[dict] = [] - albums = await self._retrieve("artist", artist_id) - if not albums: - logging.warning("No albums found for artist ID: %s", artist_id) + try: + if not self.streamrip_client.logged_in: + await self.streamrip_client.login() + metadata = await self.streamrip_client.get_metadata( + item_id=artist_id_str, media_type="artist" + ) + except AttributeError: + await self.streamrip_client.login() + metadata = await self.streamrip_client.get_metadata( + item_id=artist_id_str, media_type="artist" + ) + if not metadata: + logging.warning("No metadata found for artist ID: %s", artist_id) return None + albums = metadata.get("albums", []) albums_out = [ { "artist": ", ".join(artist["name"] for artist in album["artists"]), @@ -187,18 +136,25 @@ class HifiUtil: Returns: Optional[list[dict]]: List of tracks or None if not found. """ - track_list = await self._retrieve("album", album_id) - if not track_list: - logging.warning("No tracks found for album ID: %s", album_id) + album_id_str = str(album_id) + if not self.streamrip_client.logged_in: + await self.streamrip_client.login() + metadata = await self.streamrip_client.get_metadata( + item_id=album_id_str, media_type="album" + ) + if not metadata: + logging.warning("No metadata found for album ID: %s", album_id) return None + + track_list = metadata.get("tracks", []) tracks_out: list[dict] = [ { - "id": track.get("item").get("id"), - "artist": track.get("item").get("artist").get("name"), - "title": track.get("item").get("title"), - "duration": self.format_duration(track.get("item").get("duration", 0)), - "version": track.get("item").get("version"), - "audioQuality": track.get("item").get("audioQuality"), + "id": track.get("id"), + "artist": track.get("artist").get("name"), + "title": track.get("title"), + "duration": self.format_duration(track.get("duration", 0)), + "version": track.get("version"), + "audioQuality": track.get("audioQuality"), } for track in track_list ] @@ -211,28 +167,9 @@ class HifiUtil: song (str): The name of the song. Returns: Optional[dict]: The track details or None if not found. + TODO: Reimplement using StreamRip """ - tracks_out: list[dict] = [] - tracks = await self.search(artist=artist, song=song) - if not tracks: - logging.warning("No track found for artist: %s, song: %s", artist, song) - return None - tracks_out = [ - { - "artist": ", ".join(artist["name"] for artist in track["artists"]), - "song": track["title"], - "id": track["id"], - "album": track.get("album", {}).get("title", "Unknown"), - "duration": track.get("duration", 0), - } - for track in tracks - if "title" in track and "id" in track and "artists" in track - ] - if not tracks_out: - logging.warning("No valid tracks found after processing.") - return None - logging.info("Retrieved tracks: %s", tracks_out) - return tracks_out + return [] async def get_stream_url_by_track_id( self, track_id: int, quality: str = "LOSSLESS" @@ -244,13 +181,15 @@ class HifiUtil: Returns: Optional[str]: The stream URL or None if not found. """ - track = await self._retrieve("track", track_id, quality) - if not isinstance(track, list): + track_id_str = str(track_id) + track = await self.streamrip_client.get_downloadable( + track_id=track_id_str, quality=self.streamrip_config.session.tidal.quality + ) + if not track: logging.warning("No track found for ID: %s", track_id) return None - stream_url = track[2].get("OriginalTrackUrl") + stream_url = track.url if not stream_url: logging.warning("No stream URL found for track ID: %s", track_id) return None - logging.info("Retrieved stream URL: %s", stream_url) return stream_url