2025-09-09 15:50:13 -04:00
|
|
|
from typing import Optional, Any
|
2025-08-15 13:31:15 -04:00
|
|
|
from uuid import uuid4
|
|
|
|
from urllib.parse import urlparse
|
|
|
|
import hashlib
|
2025-08-07 11:47:57 -04:00
|
|
|
import logging
|
2025-09-09 15:50:13 -04:00
|
|
|
import asyncio
|
2025-08-11 14:03:43 -04:00
|
|
|
import os
|
2025-08-15 13:31:15 -04:00
|
|
|
import aiohttp
|
|
|
|
from streamrip.client import TidalClient # type: ignore
|
|
|
|
from streamrip.config import Config as StreamripConfig # type: ignore
|
2025-08-11 14:03:43 -04:00
|
|
|
from dotenv import load_dotenv
|
2025-08-15 13:31:15 -04:00
|
|
|
|
|
|
|
|
2025-08-11 14:03:43 -04:00
|
|
|
load_dotenv()
|
2025-08-09 07:48:07 -04:00
|
|
|
|
2025-08-11 14:03:43 -04:00
|
|
|
|
|
|
|
class SRUtil:
|
2025-08-07 11:47:57 -04:00
|
|
|
"""
|
2025-08-11 14:03:43 -04:00
|
|
|
StreamRip API Utility Class
|
2025-08-07 11:47:57 -04:00
|
|
|
"""
|
2025-08-09 07:48:07 -04:00
|
|
|
|
2025-08-07 11:47:57 -04:00
|
|
|
def __init__(self) -> None:
|
2025-08-11 14:03:43 -04:00
|
|
|
"""Initialize StreamRip utility."""
|
|
|
|
self.streamrip_config = StreamripConfig.defaults()
|
|
|
|
self.streamrip_config.session.tidal.user_id = os.getenv("tidal_user_id", "")
|
|
|
|
self.streamrip_config.session.tidal.access_token = os.getenv(
|
|
|
|
"tidal_access_token", ""
|
|
|
|
)
|
|
|
|
self.streamrip_config.session.tidal.refresh_token = os.getenv(
|
|
|
|
"tidal_refresh_token", ""
|
|
|
|
)
|
|
|
|
self.streamrip_config.session.tidal.token_expiry = os.getenv(
|
|
|
|
"tidal_token_expiry", ""
|
|
|
|
)
|
|
|
|
self.streamrip_config.session.tidal.country_code = os.getenv(
|
|
|
|
"tidal_country_code", ""
|
|
|
|
)
|
|
|
|
self.streamrip_config.session.tidal.quality = int(
|
|
|
|
os.getenv("tidal_default_quality", 2)
|
|
|
|
)
|
|
|
|
self.streamrip_config.session.conversion.enabled = False
|
|
|
|
self.streamrip_config.session.downloads.folder = os.getenv(
|
|
|
|
"tidal_download_folder", ""
|
|
|
|
)
|
|
|
|
self.streamrip_config
|
|
|
|
self.streamrip_client = TidalClient(self.streamrip_config)
|
2025-09-09 15:50:13 -04:00
|
|
|
self.MAX_METADATA_RETRIES = 5
|
|
|
|
self.RETRY_DELAY = 1.0 # seconds between retries
|
2025-08-07 11:47:57 -04:00
|
|
|
|
2025-08-09 07:48:07 -04:00
|
|
|
def dedupe_by_key(self, key: str, entries: list[dict]) -> list[dict]:
|
2025-08-07 11:47:57 -04:00
|
|
|
deduped = {}
|
|
|
|
for entry in entries:
|
|
|
|
norm = entry[key].strip().lower()
|
|
|
|
if norm not in deduped:
|
|
|
|
deduped[norm] = entry
|
|
|
|
return list(deduped.values())
|
2025-08-09 07:48:07 -04:00
|
|
|
|
2025-08-07 11:47:57 -04:00
|
|
|
def format_duration(self, seconds):
|
|
|
|
if not seconds:
|
|
|
|
return None
|
|
|
|
m, s = divmod(seconds, 60)
|
|
|
|
return f"{m}:{s:02}"
|
|
|
|
|
2025-09-09 15:50:13 -04:00
|
|
|
def combine_album_track_metadata(
|
|
|
|
self, album_json: dict[str, Any], track_json: dict[str, Any]
|
|
|
|
) -> dict[str, Any]:
|
|
|
|
"""
|
|
|
|
Combine album-level and track-level metadata into a unified tag dictionary.
|
|
|
|
If track_json comes from album_json['tracks'], it will override album-level values where relevant.
|
|
|
|
"""
|
|
|
|
# Album-level
|
|
|
|
combined = {
|
|
|
|
"album": album_json.get("title"),
|
|
|
|
"album_artist": album_json.get("artist", {}).get("name"),
|
|
|
|
"release_date": album_json.get("releaseDate"),
|
|
|
|
"album_type": album_json.get("type"),
|
|
|
|
"total_tracks": album_json.get("numberOfTracks"),
|
|
|
|
"upc": album_json.get("upc"),
|
|
|
|
"album_copyright": album_json.get("copyright"),
|
|
|
|
"album_cover_id": album_json.get("cover"),
|
|
|
|
"album_cover_url": f"https://resources.tidal.com/images/{album_json.get('cover')}/1280x1280.jpg"
|
|
|
|
if album_json.get("cover")
|
|
|
|
else None,
|
|
|
|
}
|
|
|
|
|
|
|
|
# Track-level (overrides or adds to album info)
|
|
|
|
combined.update(
|
|
|
|
{
|
|
|
|
"title": track_json.get("title"),
|
|
|
|
"artist": track_json.get("artist", {}).get("name"),
|
|
|
|
"artists": [a.get("name") for a in track_json.get("artists", [])],
|
|
|
|
"track_number": track_json.get("trackNumber"),
|
|
|
|
"disc_number": track_json.get("volumeNumber"),
|
|
|
|
"duration": track_json.get("duration"),
|
|
|
|
"isrc": track_json.get("isrc"),
|
|
|
|
"bpm": track_json.get("bpm"),
|
|
|
|
"explicit": track_json.get("explicit"),
|
|
|
|
"replaygain": track_json.get("replayGain"),
|
|
|
|
"peak": track_json.get("peak"),
|
|
|
|
"lyrics": track_json.get("lyrics"),
|
|
|
|
"track_copyright": track_json.get("copyright"),
|
|
|
|
"cover_id": track_json.get("album", {}).get(
|
|
|
|
"cover", album_json.get("cover")
|
|
|
|
),
|
|
|
|
"cover_url": f"https://resources.tidal.com/images/{track_json.get('album', {}).get('cover', album_json.get('cover'))}/1280x1280.jpg"
|
|
|
|
if (track_json.get("album", {}).get("cover") or album_json.get("cover"))
|
|
|
|
else None,
|
|
|
|
}
|
|
|
|
)
|
|
|
|
|
|
|
|
return combined
|
|
|
|
|
|
|
|
def combine_album_with_all_tracks(
|
|
|
|
self, album_json: dict[str, Any]
|
|
|
|
) -> list[dict[str, Any]]:
|
|
|
|
"""Return a list of combined metadata dicts for all tracks in an album JSON."""
|
|
|
|
return [
|
|
|
|
self.combine_album_track_metadata(album_json, t)
|
|
|
|
for t in album_json.get("tracks", [])
|
|
|
|
]
|
|
|
|
|
2025-08-09 07:48:07 -04:00
|
|
|
async def get_artists_by_name(self, artist_name: str) -> Optional[list]:
|
2025-08-11 14:06:42 -04:00
|
|
|
"""Get artist(s) by name.
|
2025-08-07 11:47:57 -04:00
|
|
|
Args:
|
|
|
|
artist_name (str): The name of the artist.
|
|
|
|
Returns:
|
|
|
|
Optional[dict]: The artist details or None if not found.
|
|
|
|
"""
|
2025-08-11 14:03:43 -04:00
|
|
|
|
2025-08-20 07:32:57 -04:00
|
|
|
try:
|
2025-08-11 14:03:43 -04:00
|
|
|
await self.streamrip_client.login()
|
2025-08-20 07:32:57 -04:00
|
|
|
except Exception as e:
|
|
|
|
logging.info("Login Exception: %s", str(e))
|
|
|
|
pass
|
2025-08-07 11:47:57 -04:00
|
|
|
artists_out: list[dict] = []
|
2025-08-11 14:03:43 -04:00
|
|
|
try:
|
|
|
|
artists = await self.streamrip_client.search(
|
|
|
|
media_type="artist", query=artist_name
|
|
|
|
)
|
|
|
|
except AttributeError:
|
|
|
|
await self.streamrip_client.login()
|
|
|
|
artists = await self.streamrip_client.search(
|
|
|
|
media_type="artist", query=artist_name
|
|
|
|
)
|
2025-08-20 07:32:57 -04:00
|
|
|
logging.critical("Artists output: %s", artists)
|
2025-08-11 14:03:43 -04:00
|
|
|
artists = artists[0].get("items", [])
|
2025-08-07 11:47:57 -04:00
|
|
|
if not artists:
|
|
|
|
logging.warning("No artist found for name: %s", artist_name)
|
|
|
|
return None
|
|
|
|
artists_out = [
|
|
|
|
{
|
2025-08-09 07:48:07 -04:00
|
|
|
"artist": res["name"],
|
|
|
|
"id": res["id"],
|
|
|
|
}
|
|
|
|
for res in artists
|
|
|
|
if "name" in res and "id" in res
|
2025-08-07 11:47:57 -04:00
|
|
|
]
|
2025-08-09 07:48:07 -04:00
|
|
|
artists_out = self.dedupe_by_key("artist", artists_out) # Remove duplicates
|
2025-08-07 11:47:57 -04:00
|
|
|
return artists_out
|
2025-08-09 07:48:07 -04:00
|
|
|
|
|
|
|
async def get_albums_by_artist_id(self, artist_id: int) -> Optional[list | dict]:
|
2025-08-11 14:06:42 -04:00
|
|
|
"""Get albums by artist ID
|
2025-08-07 11:47:57 -04:00
|
|
|
Args:
|
|
|
|
artist_id (int): The ID of the artist.
|
|
|
|
Returns:
|
|
|
|
Optional[list[dict]]: List of albums or None if not found.
|
|
|
|
"""
|
2025-08-11 14:03:43 -04:00
|
|
|
artist_id_str: str = str(artist_id)
|
2025-08-07 11:47:57 -04:00
|
|
|
albums_out: list[dict] = []
|
2025-08-11 14:03:43 -04:00
|
|
|
try:
|
2025-08-20 07:32:57 -04:00
|
|
|
await self.streamrip_client.login()
|
2025-08-11 14:03:43 -04:00
|
|
|
metadata = await self.streamrip_client.get_metadata(
|
|
|
|
item_id=artist_id_str, media_type="artist"
|
|
|
|
)
|
|
|
|
except AttributeError:
|
|
|
|
await self.streamrip_client.login()
|
|
|
|
metadata = await self.streamrip_client.get_metadata(
|
|
|
|
item_id=artist_id_str, media_type="artist"
|
|
|
|
)
|
|
|
|
if not metadata:
|
|
|
|
logging.warning("No metadata found for artist ID: %s", artist_id)
|
2025-08-07 11:47:57 -04:00
|
|
|
return None
|
2025-08-11 15:06:58 -04:00
|
|
|
albums = self.dedupe_by_key("title", metadata.get("albums", []))
|
2025-08-07 11:47:57 -04:00
|
|
|
albums_out = [
|
|
|
|
{
|
2025-08-09 07:48:07 -04:00
|
|
|
"artist": ", ".join(artist["name"] for artist in album["artists"]),
|
|
|
|
"album": album["title"],
|
|
|
|
"id": album["id"],
|
|
|
|
"release_date": album.get("releaseDate", "Unknown"),
|
|
|
|
}
|
|
|
|
for album in albums
|
|
|
|
if "title" in album and "id" in album and "artists" in album
|
|
|
|
]
|
2025-08-07 11:47:57 -04:00
|
|
|
|
2025-08-11 15:06:58 -04:00
|
|
|
logging.debug("Retrieved albums: %s", albums_out)
|
2025-08-07 11:47:57 -04:00
|
|
|
return albums_out
|
2025-08-09 07:48:07 -04:00
|
|
|
|
2025-08-21 15:08:13 -04:00
|
|
|
async def get_tracks_by_album_id(
|
|
|
|
self, album_id: int, quality: str = "FLAC"
|
|
|
|
) -> Optional[list | dict]:
|
2025-08-11 14:06:42 -04:00
|
|
|
"""Get tracks by album ID
|
2025-08-07 11:47:57 -04:00
|
|
|
Args:
|
|
|
|
album_id (int): The ID of the album.
|
|
|
|
Returns:
|
|
|
|
Optional[list[dict]]: List of tracks or None if not found.
|
|
|
|
"""
|
2025-08-11 14:03:43 -04:00
|
|
|
album_id_str = str(album_id)
|
2025-08-20 07:32:57 -04:00
|
|
|
await self.streamrip_client.login()
|
2025-08-11 14:03:43 -04:00
|
|
|
metadata = await self.streamrip_client.get_metadata(
|
|
|
|
item_id=album_id_str, media_type="album"
|
|
|
|
)
|
|
|
|
if not metadata:
|
|
|
|
logging.warning("No metadata found for album ID: %s", album_id)
|
2025-08-07 11:47:57 -04:00
|
|
|
return None
|
2025-08-11 14:03:43 -04:00
|
|
|
|
|
|
|
track_list = metadata.get("tracks", [])
|
2025-08-07 11:47:57 -04:00
|
|
|
tracks_out: list[dict] = [
|
|
|
|
{
|
2025-08-11 14:03:43 -04:00
|
|
|
"id": track.get("id"),
|
|
|
|
"artist": track.get("artist").get("name"),
|
|
|
|
"title": track.get("title"),
|
|
|
|
"duration": self.format_duration(track.get("duration", 0)),
|
|
|
|
"version": track.get("version"),
|
|
|
|
"audioQuality": track.get("audioQuality"),
|
2025-08-09 07:48:07 -04:00
|
|
|
}
|
|
|
|
for track in track_list
|
2025-08-07 11:47:57 -04:00
|
|
|
]
|
|
|
|
return tracks_out
|
|
|
|
|
2025-08-09 07:48:07 -04:00
|
|
|
async def get_tracks_by_artist_song(self, artist: str, song: str) -> Optional[list]:
|
2025-08-11 14:06:42 -04:00
|
|
|
"""Get track by artist and song name
|
2025-08-07 11:47:57 -04:00
|
|
|
Args:
|
|
|
|
artist (str): The name of the artist.
|
|
|
|
song (str): The name of the song.
|
|
|
|
Returns:
|
|
|
|
Optional[dict]: The track details or None if not found.
|
2025-08-11 14:03:43 -04:00
|
|
|
TODO: Reimplement using StreamRip
|
2025-08-07 11:47:57 -04:00
|
|
|
"""
|
2025-08-11 14:03:43 -04:00
|
|
|
return []
|
2025-08-07 11:47:57 -04:00
|
|
|
|
2025-08-09 07:48:07 -04:00
|
|
|
async def get_stream_url_by_track_id(
|
2025-08-21 15:06:56 -04:00
|
|
|
self, track_id: int, quality: str = "FLAC"
|
2025-08-09 07:48:07 -04:00
|
|
|
) -> Optional[str]:
|
2025-08-11 14:06:42 -04:00
|
|
|
"""Get stream URL by track ID
|
2025-08-07 11:47:57 -04:00
|
|
|
Args:
|
|
|
|
track_id (int): The ID of the track.
|
|
|
|
quality (str): The quality of the stream, default is "LOSSLESS". Other options: HIGH, LOW
|
|
|
|
Returns:
|
|
|
|
Optional[str]: The stream URL or None if not found.
|
|
|
|
"""
|
2025-08-21 15:06:56 -04:00
|
|
|
if quality not in ["FLAC", "Lossy"]:
|
2025-08-15 13:31:15 -04:00
|
|
|
logging.error("Invalid quality requested: %s", quality)
|
2025-08-21 15:06:56 -04:00
|
|
|
return None
|
2025-08-11 15:06:58 -04:00
|
|
|
quality_int: int = int(self.streamrip_config.session.tidal.quality)
|
|
|
|
match quality:
|
2025-08-21 15:06:56 -04:00
|
|
|
case "FLAC":
|
|
|
|
quality_int = 2
|
|
|
|
case "Lossy":
|
2025-08-11 15:06:58 -04:00
|
|
|
quality_int = 1
|
|
|
|
track_id_str: str = str(track_id)
|
|
|
|
|
2025-08-20 07:32:57 -04:00
|
|
|
await self.streamrip_client.login()
|
2025-08-11 15:06:58 -04:00
|
|
|
|
|
|
|
try:
|
2025-08-21 15:06:56 -04:00
|
|
|
logging.critical("Using quality_int: %s", quality_int)
|
2025-08-11 15:06:58 -04:00
|
|
|
track = await self.streamrip_client.get_downloadable(
|
|
|
|
track_id=track_id_str, quality=quality_int
|
2025-08-15 13:31:15 -04:00
|
|
|
)
|
2025-08-11 15:06:58 -04:00
|
|
|
except AttributeError:
|
|
|
|
await self.streamrip_client.login()
|
|
|
|
track = await self.streamrip_client.get_downloadable(
|
|
|
|
track_id=track_id_str, quality=quality_int
|
2025-08-15 13:31:15 -04:00
|
|
|
)
|
2025-08-11 14:03:43 -04:00
|
|
|
if not track:
|
2025-08-07 11:47:57 -04:00
|
|
|
logging.warning("No track found for ID: %s", track_id)
|
|
|
|
return None
|
2025-08-11 14:03:43 -04:00
|
|
|
stream_url = track.url
|
2025-08-07 11:47:57 -04:00
|
|
|
if not stream_url:
|
|
|
|
logging.warning("No stream URL found for track ID: %s", track_id)
|
|
|
|
return None
|
|
|
|
return stream_url
|
2025-08-15 13:31:15 -04:00
|
|
|
|
|
|
|
async def get_metadata_by_track_id(self, track_id: int) -> Optional[dict]:
|
2025-09-09 15:50:13 -04:00
|
|
|
"""
|
|
|
|
Fetch track + album metadata with retries.
|
|
|
|
Returns combined metadata dict or None after exhausting retries.
|
|
|
|
"""
|
|
|
|
for attempt in range(1, self.MAX_METADATA_RETRIES + 1):
|
|
|
|
try:
|
|
|
|
await self.streamrip_client.login()
|
|
|
|
metadata = await self.streamrip_client.get_metadata(
|
|
|
|
str(track_id), "track"
|
|
|
|
)
|
|
|
|
album_id = metadata.get("album", {}).get("id")
|
|
|
|
album_metadata = await self.streamrip_client.get_metadata(
|
|
|
|
album_id, "album"
|
|
|
|
)
|
|
|
|
combined_metadata: dict = self.combine_album_track_metadata(
|
|
|
|
album_metadata, metadata
|
|
|
|
)
|
|
|
|
logging.info(
|
|
|
|
"Combined metadata for track ID %s (attempt %d): %s",
|
|
|
|
track_id,
|
|
|
|
attempt,
|
|
|
|
combined_metadata,
|
|
|
|
)
|
|
|
|
return combined_metadata
|
|
|
|
except Exception as e:
|
|
|
|
logging.warning(
|
|
|
|
"Metadata fetch failed for track %s (attempt %d/%d): %s",
|
|
|
|
track_id,
|
|
|
|
attempt,
|
|
|
|
self.MAX_METADATA_RETRIES,
|
|
|
|
str(e),
|
|
|
|
)
|
|
|
|
if attempt < self.MAX_METADATA_RETRIES:
|
|
|
|
await asyncio.sleep(self.RETRY_DELAY)
|
|
|
|
else:
|
|
|
|
logging.error(
|
|
|
|
"Metadata fetch failed permanently for track %s after %d attempts",
|
|
|
|
track_id,
|
|
|
|
self.MAX_METADATA_RETRIES,
|
|
|
|
)
|
|
|
|
return None
|
2025-08-15 13:31:15 -04:00
|
|
|
|
|
|
|
async def download(self, track_id: int, quality: str = "LOSSLESS") -> bool | str:
|
|
|
|
"""Download track
|
|
|
|
Args:
|
|
|
|
track_id (int)
|
|
|
|
quality (str): The quality of the stream, default is "LOSSLESS". Other options: HIGH, LOW
|
|
|
|
Returns:
|
|
|
|
bool
|
|
|
|
"""
|
|
|
|
try:
|
2025-08-20 07:32:57 -04:00
|
|
|
await self.streamrip_client.login()
|
2025-08-15 13:31:15 -04:00
|
|
|
track_url = await self.get_stream_url_by_track_id(track_id)
|
|
|
|
if not track_url:
|
|
|
|
return False
|
|
|
|
parsed_url = urlparse(track_url)
|
|
|
|
parsed_url_filename = os.path.basename(parsed_url.path)
|
|
|
|
parsed_url_ext = os.path.splitext(parsed_url_filename)[1]
|
|
|
|
unique = hashlib.sha256(uuid4().bytes).hexdigest()[:16]
|
|
|
|
dl_folder_path = (
|
|
|
|
f"{self.streamrip_config.session.downloads.folder}/{unique}"
|
|
|
|
)
|
|
|
|
dl_path = f"{dl_folder_path}/{track_id}.{parsed_url_ext}"
|
|
|
|
async with aiohttp.ClientSession() as session:
|
|
|
|
async with session.get(
|
|
|
|
track_url, headers={}, timeout=aiohttp.ClientTimeout(total=60)
|
|
|
|
) as resp:
|
|
|
|
resp.raise_for_status()
|
|
|
|
with open(dl_path, "wb") as f:
|
|
|
|
async for chunk in resp.content.iter_chunked(1024 * 64):
|
|
|
|
f.write(chunk)
|
|
|
|
return dl_path
|
|
|
|
except Exception as e:
|
|
|
|
logging.critical("Error: %s", str(e))
|
|
|
|
return False
|