Files
api/utils/sr_wrapper.py

269 lines
9.9 KiB
Python

from typing import Optional
from uuid import uuid4
from urllib.parse import urlparse
import hashlib
import logging
import os
import asyncio
import aiohttp
from streamrip.client import TidalClient # type: ignore
from streamrip.config import Config as StreamripConfig # type: ignore
from dotenv import load_dotenv
load_dotenv()
class SRUtil:
"""
StreamRip API Utility Class
"""
def __init__(self) -> None:
"""Initialize StreamRip utility."""
self.streamrip_config = StreamripConfig.defaults()
self.streamrip_config.session.tidal.user_id = os.getenv("tidal_user_id", "")
self.streamrip_config.session.tidal.access_token = os.getenv(
"tidal_access_token", ""
)
self.streamrip_config.session.tidal.refresh_token = os.getenv(
"tidal_refresh_token", ""
)
self.streamrip_config.session.tidal.token_expiry = os.getenv(
"tidal_token_expiry", ""
)
self.streamrip_config.session.tidal.country_code = os.getenv(
"tidal_country_code", ""
)
self.streamrip_config.session.tidal.quality = int(
os.getenv("tidal_default_quality", 2)
)
self.streamrip_config.session.conversion.enabled = False
self.streamrip_config.session.downloads.folder = os.getenv(
"tidal_download_folder", ""
)
self.streamrip_config
self.streamrip_client = TidalClient(self.streamrip_config)
asyncio.get_event_loop().create_task(self.streamrip_client.login())
def dedupe_by_key(self, key: str, entries: list[dict]) -> list[dict]:
deduped = {}
for entry in entries:
norm = entry[key].strip().lower()
if norm not in deduped:
deduped[norm] = entry
return list(deduped.values())
def format_duration(self, seconds):
if not seconds:
return None
m, s = divmod(seconds, 60)
return f"{m}:{s:02}"
async def get_artists_by_name(self, artist_name: str) -> Optional[list]:
"""Get artist(s) by name.
Args:
artist_name (str): The name of the artist.
Returns:
Optional[dict]: The artist details or None if not found.
"""
if not self.streamrip_client.logged_in:
await self.streamrip_client.login()
artists_out: list[dict] = []
try:
artists = await self.streamrip_client.search(
media_type="artist", query=artist_name
)
except AttributeError:
await self.streamrip_client.login()
artists = await self.streamrip_client.search(
media_type="artist", query=artist_name
)
artists = artists[0].get("items", [])
if not artists:
logging.warning("No artist found for name: %s", artist_name)
return None
artists_out = [
{
"artist": res["name"],
"id": res["id"],
}
for res in artists
if "name" in res and "id" in res
]
artists_out = self.dedupe_by_key("artist", artists_out) # Remove duplicates
return artists_out
async def get_albums_by_artist_id(self, artist_id: int) -> Optional[list | dict]:
"""Get albums by artist ID
Args:
artist_id (int): The ID of the artist.
Returns:
Optional[list[dict]]: List of albums or None if not found.
"""
artist_id_str: str = str(artist_id)
albums_out: list[dict] = []
try:
if not self.streamrip_client.logged_in:
await self.streamrip_client.login()
metadata = await self.streamrip_client.get_metadata(
item_id=artist_id_str, media_type="artist"
)
except AttributeError:
await self.streamrip_client.login()
metadata = await self.streamrip_client.get_metadata(
item_id=artist_id_str, media_type="artist"
)
if not metadata:
logging.warning("No metadata found for artist ID: %s", artist_id)
return None
albums = self.dedupe_by_key("title", metadata.get("albums", []))
albums_out = [
{
"artist": ", ".join(artist["name"] for artist in album["artists"]),
"album": album["title"],
"id": album["id"],
"release_date": album.get("releaseDate", "Unknown"),
}
for album in albums
if "title" in album and "id" in album and "artists" in album
]
logging.debug("Retrieved albums: %s", albums_out)
return albums_out
async def get_tracks_by_album_id(self, album_id: int) -> Optional[list | dict]:
"""Get tracks by album ID
Args:
album_id (int): The ID of the album.
Returns:
Optional[list[dict]]: List of tracks or None if not found.
"""
album_id_str = str(album_id)
if not self.streamrip_client.logged_in:
await self.streamrip_client.login()
metadata = await self.streamrip_client.get_metadata(
item_id=album_id_str, media_type="album"
)
if not metadata:
logging.warning("No metadata found for album ID: %s", album_id)
return None
track_list = metadata.get("tracks", [])
tracks_out: list[dict] = [
{
"id": track.get("id"),
"artist": track.get("artist").get("name"),
"title": track.get("title"),
"duration": self.format_duration(track.get("duration", 0)),
"version": track.get("version"),
"audioQuality": track.get("audioQuality"),
}
for track in track_list
]
return tracks_out
async def get_tracks_by_artist_song(self, artist: str, song: str) -> Optional[list]:
"""Get track by artist and song name
Args:
artist (str): The name of the artist.
song (str): The name of the song.
Returns:
Optional[dict]: The track details or None if not found.
TODO: Reimplement using StreamRip
"""
return []
async def get_stream_url_by_track_id(
self, track_id: int, quality: str = "LOSSLESS"
) -> Optional[str]:
"""Get stream URL by track ID
Args:
track_id (int): The ID of the track.
quality (str): The quality of the stream, default is "LOSSLESS". Other options: HIGH, LOW
Returns:
Optional[str]: The stream URL or None if not found.
"""
if quality not in ["LOSSLESS", "HIGH", "LOW"]:
logging.error("Invalid quality requested: %s", quality)
quality_int: int = int(self.streamrip_config.session.tidal.quality)
match quality:
case "HIGH":
quality_int = 1
case "LOW":
quality_int = 0
track_id_str: str = str(track_id)
if not self.streamrip_client.logged_in:
await self.streamrip_client.login()
try:
track = await self.streamrip_client.get_downloadable(
track_id=track_id_str, quality=quality_int
)
except AttributeError:
await self.streamrip_client.login()
track = await self.streamrip_client.get_downloadable(
track_id=track_id_str, quality=quality_int
)
if not track:
logging.warning("No track found for ID: %s", track_id)
return None
stream_url = track.url
if not stream_url:
logging.warning("No stream URL found for track ID: %s", track_id)
return None
return stream_url
async def get_metadata_by_track_id(self, track_id: int) -> Optional[dict]:
if not self.streamrip_client.logged_in:
await self.streamrip_client.login()
try:
metadata = await self.streamrip_client.get_metadata(str(track_id), "track")
return {
"artist": metadata.get("artist", {}).get("name", "Unknown Artist"),
"album": metadata.get("album", {}).get("title", "Unknown Album"),
"song": metadata.get("title", uuid4()),
}
except Exception as e:
logging.critical(
"Get metadata for %s failed, Exception: %s", track_id, str(e)
)
return None
async def download(self, track_id: int, quality: str = "LOSSLESS") -> bool | str:
"""Download track
Args:
track_id (int)
quality (str): The quality of the stream, default is "LOSSLESS". Other options: HIGH, LOW
Returns:
bool
"""
if not self.streamrip_client.logged_in:
await self.streamrip_client.login()
try:
track_url = await self.get_stream_url_by_track_id(track_id)
if not track_url:
return False
parsed_url = urlparse(track_url)
parsed_url_filename = os.path.basename(parsed_url.path)
parsed_url_ext = os.path.splitext(parsed_url_filename)[1]
unique = hashlib.sha256(uuid4().bytes).hexdigest()[:16]
dl_folder_path = (
f"{self.streamrip_config.session.downloads.folder}/{unique}"
)
dl_path = f"{dl_folder_path}/{track_id}.{parsed_url_ext}"
async with aiohttp.ClientSession() as session:
async with session.get(
track_url, headers={}, timeout=aiohttp.ClientTimeout(total=60)
) as resp:
resp.raise_for_status()
with open(dl_path, "wb") as f:
async for chunk in resp.content.iter_chunked(1024 * 64):
f.write(chunk)
return dl_path
except Exception as e:
logging.critical("Error: %s", str(e))
return False