Files
api/utils/sr_wrapper.py

804 lines
30 KiB
Python
Raw Normal View History

from typing import Optional, Any, Callable
2025-08-15 13:31:15 -04:00
from uuid import uuid4
from urllib.parse import urlparse
import hashlib
2025-09-18 08:13:21 -04:00
import traceback
import logging
2025-09-12 22:39:59 -04:00
import random
2025-09-09 15:50:13 -04:00
import asyncio
import os
2025-08-15 13:31:15 -04:00
import aiohttp
2025-09-12 22:39:59 -04:00
import time
2025-08-15 13:31:15 -04:00
from streamrip.client import TidalClient # type: ignore
from streamrip.config import Config as StreamripConfig # type: ignore
from dotenv import load_dotenv
2025-09-18 08:13:21 -04:00
from rapidfuzz import fuzz
2025-08-15 13:31:15 -04:00
class MetadataFetchError(Exception):
"""Raised when metadata fetch permanently fails after retries."""
# Suppress all logging output from this module and its children
for name in [__name__, "utils.sr_wrapper"]:
logger = logging.getLogger(name)
logger.setLevel(logging.INFO) # Temporarily set to INFO for debugging LRC
logger.propagate = False
for handler in logger.handlers:
handler.setLevel(logging.INFO)
# Also set the root logger to CRITICAL as a last resort (may affect global logging)
# logging.getLogger().setLevel(logging.CRITICAL)
load_dotenv()
2025-08-09 07:48:07 -04:00
class SRUtil:
"""
StreamRip API Utility Class
"""
2025-08-09 07:48:07 -04:00
def __init__(self) -> None:
"""Initialize StreamRip utility."""
self.streamrip_config = StreamripConfig.defaults()
self.streamrip_config.session.tidal.user_id = os.getenv("tidal_user_id", "")
self.streamrip_config.session.tidal.access_token = os.getenv(
"tidal_access_token", ""
)
self.streamrip_config.session.tidal.refresh_token = os.getenv(
"tidal_refresh_token", ""
)
self.streamrip_config.session.tidal.token_expiry = os.getenv(
"tidal_token_expiry", ""
)
self.streamrip_config.session.tidal.country_code = os.getenv(
"tidal_country_code", ""
)
self.streamrip_config.session.tidal.quality = int(
os.getenv("tidal_default_quality", 2)
)
self.streamrip_config.session.conversion.enabled = False
self.streamrip_config.session.downloads.folder = os.getenv(
"tidal_download_folder", ""
)
self.streamrip_config
self.streamrip_client = TidalClient(self.streamrip_config)
2025-09-12 22:39:59 -04:00
self.MAX_CONCURRENT_METADATA_REQUESTS = 2
self.METADATA_RATE_LIMIT = 1.25
2025-09-23 13:17:34 -04:00
self.METADATA_SEMAPHORE = asyncio.Semaphore(
self.MAX_CONCURRENT_METADATA_REQUESTS
)
2025-09-12 22:39:59 -04:00
self.LAST_METADATA_REQUEST = 0
2025-09-09 15:50:13 -04:00
self.MAX_METADATA_RETRIES = 5
2025-09-12 22:39:59 -04:00
self.METADATA_ALBUM_CACHE: dict[str, dict] = {}
2025-09-09 15:50:13 -04:00
self.RETRY_DELAY = 1.0 # seconds between retries
# Callback invoked when a 429 is first observed. Signature: (Exception) -> None or async
self.on_rate_limit: Optional[Callable[[Exception], Any]] = None
# Internal flag to avoid repeated notifications for the same runtime
self._rate_limit_notified = False
2025-09-12 22:39:59 -04:00
async def rate_limited_request(self, func, *args, **kwargs):
2025-09-23 13:17:34 -04:00
async with self.METADATA_SEMAPHORE:
now = time.time()
elapsed = now - self.LAST_METADATA_REQUEST
if elapsed < self.METADATA_RATE_LIMIT:
await asyncio.sleep(self.METADATA_RATE_LIMIT - elapsed)
result = await func(*args, **kwargs)
self.LAST_METADATA_REQUEST = time.time()
return result
async def _safe_api_call(
self, func, *args, retries: int = 2, backoff: float = 0.5, **kwargs
):
"""Call an async API function with resilient retry behavior.
- On AttributeError: attempt a `login()` once and retry.
- On connection-related errors (aiohttp.ClientError, OSError, Timeout):
attempt a `login()` and retry up to `retries` times.
- On 400/429 responses (message contains '400' or '429'): retry with backoff
without triggering login (to avoid excessive logins).
Returns the result or raises the last exception.
"""
last_exc: Optional[Exception] = None
for attempt in range(retries):
try:
return await func(*args, **kwargs)
except AttributeError as e:
# Probably missing/closed client internals: try re-login once
last_exc = e
try:
await self.streamrip_client.login()
except Exception:
pass
continue
except Exception as e:
last_exc = e
msg = str(e)
# Treat 400/429 as transient rate-limit/server responses — retry without login
if ("400" in msg or "429" in msg) and attempt < retries - 1:
# Notify on the first observed 429 (if a callback is set)
try:
2025-09-23 13:17:34 -04:00
if (
"429" in msg
and not self._rate_limit_notified
and self.on_rate_limit
):
self._rate_limit_notified = True
try:
if asyncio.iscoroutinefunction(self.on_rate_limit):
asyncio.create_task(self.on_rate_limit(e))
else:
loop = asyncio.get_running_loop()
loop.run_in_executor(None, self.on_rate_limit, e)
except Exception:
pass
except Exception:
pass
2025-09-23 13:17:34 -04:00
await asyncio.sleep(backoff * (2**attempt))
continue
# Connection related errors — try to re-login then retry
2025-09-23 13:17:34 -04:00
if (
isinstance(
e,
(
aiohttp.ClientError,
OSError,
ConnectionError,
asyncio.TimeoutError,
),
)
or "Connection" in msg
or "closed" in msg.lower()
):
try:
await self.streamrip_client.login()
except Exception:
pass
if attempt < retries - 1:
2025-09-23 13:17:34 -04:00
await asyncio.sleep(backoff * (2**attempt))
continue
# Unhandled / permanent error: re-raise after loop ends
# If we reach here, raise the last exception
if last_exc:
raise last_exc
return None
2025-09-18 08:13:21 -04:00
def is_fuzzy_match(self, expected, actual, threshold=80):
if not expected or not actual:
return False
return fuzz.token_set_ratio(expected.lower(), actual.lower()) >= threshold
2025-09-23 13:17:34 -04:00
def is_metadata_match(
self,
expected_artist,
expected_album,
expected_title,
found_artist,
found_album,
found_title,
threshold=80,
):
2025-09-18 08:13:21 -04:00
artist_match = self.is_fuzzy_match(expected_artist, found_artist, threshold)
2025-09-23 13:17:34 -04:00
album_match = (
self.is_fuzzy_match(expected_album, found_album, threshold)
if expected_album
else True
)
2025-09-18 08:13:21 -04:00
title_match = self.is_fuzzy_match(expected_title, found_title, threshold)
return artist_match and album_match and title_match
2025-09-12 22:39:59 -04:00
2025-08-09 07:48:07 -04:00
def dedupe_by_key(self, key: str, entries: list[dict]) -> list[dict]:
deduped = {}
for entry in entries:
norm = entry[key].strip().lower()
if norm not in deduped:
deduped[norm] = entry
return list(deduped.values())
2025-08-09 07:48:07 -04:00
2025-09-23 13:17:34 -04:00
def group_artists_by_name(
self, entries: list[dict], query: Optional[str] = None
) -> list[dict]:
"""
Group artist entries by normalized display name and pick a primary candidate per name.
Returns a list of dicts where each dict contains the primary candidate plus
an `alternatives` list for other artists that share the same display name.
Scoring/selection policy:
- If `query` is provided, prefer an exact case-insensitive match.
- Otherwise prefer the entry with highest fuzzy match to `query`.
- Use `popularity` as a tiebreaker.
This keeps a single line in an autocomplete dropdown while preserving the
alternate choices (IDs) so the UI can show a submenu or a secondary picker.
"""
buckets: dict[str, list[dict]] = {}
for e in entries:
name = e.get("artist", "")
norm = name.strip().lower()
buckets.setdefault(norm, []).append(e)
out: list[dict] = []
for norm, items in buckets.items():
if len(items) == 1:
primary = items[0]
alternatives: list[dict] = []
else:
# Score each item
scored = []
for it in items:
score = 0.0
if query:
try:
2025-09-23 13:17:34 -04:00
if (
it.get("artist", "").strip().lower()
== query.strip().lower()
):
score += 1000.0
else:
2025-09-23 13:17:34 -04:00
score += float(
fuzz.token_set_ratio(query, it.get("artist", ""))
)
except Exception:
score += 0.0
# add small weight for popularity if present
pop = it.get("popularity") or 0
try:
score += float(pop) / 100.0
except Exception:
pass
scored.append((score, it))
scored.sort(key=lambda x: x[0], reverse=True)
primary = scored[0][1]
alternatives = [it for _, it in scored[1:]]
2025-09-23 13:17:34 -04:00
out.append(
{
"artist": primary.get("artist"),
"id": primary.get("id"),
"popularity": primary.get("popularity"),
"alternatives": alternatives,
}
)
return out
def format_duration(self, seconds):
if not seconds:
return None
m, s = divmod(seconds, 60)
return f"{m}:{s:02}"
2025-09-23 13:17:34 -04:00
2025-09-18 08:13:21 -04:00
def _get_tidal_cover_url(self, uuid, size):
"""Generate a tidal cover url.
:param uuid: VALID uuid string
:param size:
"""
2025-09-23 13:17:34 -04:00
TIDAL_COVER_URL = (
"https://resources.tidal.com/images/{uuid}/{width}x{height}.jpg"
)
2025-09-18 08:13:21 -04:00
possibles = (80, 160, 320, 640, 1280)
assert size in possibles, f"size must be in {possibles}"
return TIDAL_COVER_URL.format(
uuid=uuid.replace("-", "/"),
height=size,
width=size,
)
2025-09-09 15:50:13 -04:00
def combine_album_track_metadata(
2025-09-12 22:39:59 -04:00
self, album_json: dict | None, track_json: dict
) -> dict:
2025-09-09 15:50:13 -04:00
"""
Combine album-level and track-level metadata into a unified tag dictionary.
2025-09-12 22:39:59 -04:00
Track-level metadata overrides album-level where relevant.
2025-09-09 15:50:13 -04:00
"""
2025-09-12 22:39:59 -04:00
album_json = album_json or {}
2025-09-09 15:50:13 -04:00
# Album-level
combined = {
"album": album_json.get("title"),
"album_artist": album_json.get("artist", {}).get("name"),
"release_date": album_json.get("releaseDate"),
"album_type": album_json.get("type"),
"total_tracks": album_json.get("numberOfTracks"),
"upc": album_json.get("upc"),
"album_copyright": album_json.get("copyright"),
"album_cover_id": album_json.get("cover"),
"album_cover_url": f"https://resources.tidal.com/images/{album_json.get('cover')}/1280x1280.jpg"
if album_json.get("cover")
else None,
}
# Track-level (overrides or adds to album info)
combined.update(
{
"title": track_json.get("title"),
"artist": track_json.get("artist", {}).get("name"),
"artists": [a.get("name") for a in track_json.get("artists", [])],
"track_number": track_json.get("trackNumber"),
"disc_number": track_json.get("volumeNumber"),
"duration": track_json.get("duration"),
"isrc": track_json.get("isrc"),
"bpm": track_json.get("bpm"),
"explicit": track_json.get("explicit"),
"replaygain": track_json.get("replayGain"),
"peak": track_json.get("peak"),
"lyrics": track_json.get("lyrics"),
"track_copyright": track_json.get("copyright"),
2025-09-23 13:17:34 -04:00
"cover_id": track_json.get("album", {}).get("cover")
or album_json.get("cover"),
2025-09-12 22:39:59 -04:00
"cover_url": (
f"https://resources.tidal.com/images/{track_json.get('album', {}).get('cover', album_json.get('cover'))}/1280x1280.jpg"
2025-09-23 13:17:34 -04:00
if (
track_json.get("album", {}).get("cover")
or album_json.get("cover")
)
2025-09-12 22:39:59 -04:00
else None
2025-09-09 15:50:13 -04:00
),
}
)
return combined
def combine_album_with_all_tracks(
self, album_json: dict[str, Any]
) -> list[dict[str, Any]]:
"""Return a list of combined metadata dicts for all tracks in an album JSON."""
return [
self.combine_album_track_metadata(album_json, t)
for t in album_json.get("tracks", [])
]
2025-09-23 13:17:34 -04:00
async def get_artists_by_name(
self, artist_name: str, group: bool = False
) -> Optional[list]:
"""Get artist(s) by name.
Args:
artist_name: query string to search for.
group: if True return grouped results (one primary per display name with
`alternatives` list). If False return raw search items (legacy shape).
Retry login only on authentication failure. Rate limit and retry on 400/429.
"""
artists_out: list[dict] = []
2025-09-18 08:13:21 -04:00
max_retries = 4
delay = 1.0
for attempt in range(max_retries):
try:
2025-09-23 13:17:34 -04:00
artists = await self._safe_api_call(
self.streamrip_client.search,
media_type="artist",
query=artist_name,
retries=3,
)
2025-09-18 08:13:21 -04:00
break
except Exception as e:
msg = str(e)
if ("400" in msg or "429" in msg) and attempt < max_retries - 1:
await asyncio.sleep(delay)
delay *= 2
continue
else:
return None
else:
return None
# `artists` can be None or a list of result pages — guard accordingly
if not artists:
return None
# If the client returned paged results (list), pick first page dict
if isinstance(artists, list):
artists_page = artists[0] if len(artists) > 0 else {}
else:
artists_page = artists
2025-09-23 13:17:34 -04:00
artists_items = (
artists_page.get("items", []) if isinstance(artists_page, dict) else []
)
if not artists_items:
return None
artists_out = [
{
2025-08-09 07:48:07 -04:00
"artist": res["name"],
"id": res["id"],
"popularity": res.get("popularity", 0),
2025-08-09 07:48:07 -04:00
}
for res in artists_items
2025-08-09 07:48:07 -04:00
if "name" in res and "id" in res
]
if group:
return self.group_artists_by_name(artists_out, query=artist_name)
return artists_out
2025-08-09 07:48:07 -04:00
async def get_albums_by_artist_id(self, artist_id: int) -> Optional[list | dict]:
2025-09-18 08:13:21 -04:00
"""Get albums by artist ID. Retry login only on authentication failure. Rate limit and retry on 400/429."""
import asyncio
2025-09-23 13:17:34 -04:00
artist_id_str: str = str(artist_id)
albums_out: list[dict] = []
2025-09-18 08:13:21 -04:00
max_retries = 4
delay = 1.0
for attempt in range(max_retries):
try:
2025-09-23 13:17:34 -04:00
metadata = await self._safe_api_call(
self.streamrip_client.get_metadata,
artist_id_str,
"artist",
retries=3,
)
2025-09-18 08:13:21 -04:00
break
except Exception as e:
msg = str(e)
if ("400" in msg or "429" in msg) and attempt < max_retries - 1:
await asyncio.sleep(delay)
delay *= 2
continue
else:
return None
else:
return None
if not metadata:
return None
2025-08-11 15:06:58 -04:00
albums = self.dedupe_by_key("title", metadata.get("albums", []))
albums_out = [
{
2025-08-09 07:48:07 -04:00
"artist": ", ".join(artist["name"] for artist in album["artists"]),
"album": album["title"],
"id": album["id"],
"release_date": album.get("releaseDate", "Unknown"),
}
for album in albums
if "title" in album and "id" in album and "artists" in album
]
return albums_out
2025-09-23 13:17:34 -04:00
2025-09-18 08:13:21 -04:00
async def get_album_by_name(self, artist: str, album: str) -> Optional[dict]:
"""Get album by artist and album name using artist ID and fuzzy matching. Try first 8 chars, then 12 if no match. Notify on success."""
2025-09-23 13:17:34 -04:00
# Notification moved to add_cover_art.py as requested
2025-09-18 08:13:21 -04:00
for trunc in (8, 12):
search_artist = artist[:trunc]
artists = await self.get_artists_by_name(search_artist)
if not artists:
continue
best_artist = None
best_artist_score = 0
for a in artists:
score = fuzz.token_set_ratio(artist, a["artist"])
if score > best_artist_score:
best_artist = a
best_artist_score = int(score)
if not best_artist or best_artist_score < 85:
continue
artist_id = best_artist["id"]
albums = await self.get_albums_by_artist_id(artist_id)
if not albums:
continue
best_album = None
best_album_score = 0
for alb in albums:
score = fuzz.token_set_ratio(album, alb["album"])
if score > best_album_score:
best_album = alb
best_album_score = int(score)
if best_album and best_album_score >= 85:
return best_album
return None
2025-09-23 13:17:34 -04:00
async def get_cover_by_album_id(
self, album_id: int, size: int = 640
) -> Optional[str]:
2025-09-18 08:13:21 -04:00
"""Get cover URL by album ID. Retry login only on authentication failure."""
if size not in [80, 160, 320, 640, 1280]:
return None
album_id_str: str = str(album_id)
for attempt in range(2):
try:
2025-09-23 13:17:34 -04:00
metadata = await self._safe_api_call(
self.streamrip_client.get_metadata,
item_id=album_id_str,
media_type="album",
retries=2,
)
2025-09-18 08:13:21 -04:00
break
except Exception:
2025-09-18 08:13:21 -04:00
if attempt == 1:
return None
else:
return None
if not metadata:
return None
cover_id = metadata.get("cover")
if not cover_id:
return None
cover_url = self._get_tidal_cover_url(cover_id, size)
return cover_url
2025-08-21 15:08:13 -04:00
async def get_tracks_by_album_id(
self, album_id: int, quality: str = "FLAC"
) -> Optional[list | dict]:
2025-08-11 14:06:42 -04:00
"""Get tracks by album ID
Args:
album_id (int): The ID of the album.
Returns:
Optional[list[dict]]: List of tracks or None if not found.
"""
album_id_str = str(album_id)
try:
2025-09-23 13:17:34 -04:00
metadata = await self._safe_api_call(
self.streamrip_client.get_metadata,
item_id=album_id_str,
media_type="album",
retries=2,
)
except Exception as e:
logging.warning("get_tracks_by_album_id failed: %s", e)
return None
if not metadata:
logging.warning("No metadata found for album ID: %s", album_id)
return None
track_list = metadata.get("tracks", [])
tracks_out: list[dict] = [
{
"id": track.get("id"),
"artist": track.get("artist").get("name"),
"title": track.get("title"),
"duration": self.format_duration(track.get("duration", 0)),
"version": track.get("version"),
"audioQuality": track.get("audioQuality"),
2025-08-09 07:48:07 -04:00
}
for track in track_list
]
return tracks_out
2025-09-23 13:17:34 -04:00
async def get_tracks_by_artist_song(
self, artist: str, song: str, n: int = 0
) -> Optional[list]:
2025-08-11 14:06:42 -04:00
"""Get track by artist and song name
Args:
artist (str): The name of the artist.
song (str): The name of the song.
Returns:
Optional[dict]: The track details or None if not found.
TODO: Reimplement using StreamRip
"""
try:
2025-09-23 13:17:34 -04:00
search_res = await self._safe_api_call(
self.streamrip_client.search,
media_type="track",
query=f"{artist} - {song}",
retries=3,
)
2025-09-18 08:13:21 -04:00
logging.critical("Result: %s", search_res)
2025-09-23 13:17:34 -04:00
return (
search_res[0].get("items")
if search_res and isinstance(search_res, list)
else []
)
2025-09-18 08:13:21 -04:00
except Exception as e:
traceback.print_exc()
logging.critical("Search Exception: %s", str(e))
if n < 3:
n += 1
2025-09-18 08:13:21 -04:00
return await self.get_tracks_by_artist_song(artist, song, n)
return []
# return []
2025-08-09 07:48:07 -04:00
async def get_stream_url_by_track_id(
self, track_id: int, quality: str = "FLAC"
2025-08-09 07:48:07 -04:00
) -> Optional[str]:
2025-08-11 14:06:42 -04:00
"""Get stream URL by track ID
Args:
track_id (int): The ID of the track.
quality (str): The quality of the stream, default is "LOSSLESS". Other options: HIGH, LOW
Returns:
Optional[str]: The stream URL or None if not found.
"""
if quality not in ["FLAC", "Lossy"]:
2025-08-15 13:31:15 -04:00
logging.error("Invalid quality requested: %s", quality)
return None
2025-08-11 15:06:58 -04:00
quality_int: int = int(self.streamrip_config.session.tidal.quality)
match quality:
case "FLAC":
quality_int = 2
case "Lossy":
2025-08-11 15:06:58 -04:00
quality_int = 1
track_id_str: str = str(track_id)
2025-09-23 13:17:34 -04:00
# Ensure client is logged in via safe call when needed inside _safe_api_call
2025-08-11 15:06:58 -04:00
try:
logging.critical("Using quality_int: %s", quality_int)
2025-09-23 13:17:34 -04:00
track = await self._safe_api_call(
self.streamrip_client.get_downloadable,
track_id=track_id_str,
quality=quality_int,
retries=3,
)
except Exception as e:
logging.warning("get_stream_url_by_track_id failed: %s", e)
return None
if not track:
logging.warning("No track found for ID: %s", track_id)
return None
stream_url = track.url
if not stream_url:
logging.warning("No stream URL found for track ID: %s", track_id)
return None
return stream_url
2025-08-15 13:31:15 -04:00
async def get_metadata_by_track_id(self, track_id: int) -> Optional[dict]:
2025-09-09 15:50:13 -04:00
"""
2025-09-12 22:39:59 -04:00
Fetch track + album metadata with retries, caching album data.
2025-09-09 15:50:13 -04:00
Returns combined metadata dict or None after exhausting retries.
"""
for attempt in range(1, self.MAX_METADATA_RETRIES + 1):
try:
await self._safe_api_call(self.streamrip_client.login, retries=1)
2025-09-12 22:39:59 -04:00
# Track metadata
metadata = await self.rate_limited_request(
self.streamrip_client.get_metadata, str(track_id), "track"
2025-09-09 15:50:13 -04:00
)
2025-09-23 13:17:34 -04:00
2025-09-09 15:50:13 -04:00
album_id = metadata.get("album", {}).get("id")
2025-09-12 22:39:59 -04:00
album_metadata = None
if album_id:
# Check cache first
if album_id in self.METADATA_ALBUM_CACHE:
album_metadata = self.METADATA_ALBUM_CACHE[album_id]
else:
album_metadata = await self.rate_limited_request(
2025-09-23 13:17:34 -04:00
lambda i, t: self._safe_api_call(
self.streamrip_client.get_metadata, i, t, retries=2
),
album_id,
"album",
2025-09-12 22:39:59 -04:00
)
if not album_metadata:
return None
self.METADATA_ALBUM_CACHE[album_id] = album_metadata
# Combine track + album metadata
if not album_metadata:
return None
2025-09-09 15:50:13 -04:00
combined_metadata: dict = self.combine_album_track_metadata(
album_metadata, metadata
)
2025-09-12 22:39:59 -04:00
# Include album id so callers can fetch cover art if desired
combined_metadata["album_id"] = album_id
2025-09-09 15:50:13 -04:00
logging.info(
"Combined metadata for track ID %s (attempt %d): %s",
track_id,
attempt,
combined_metadata,
)
return combined_metadata
2025-09-12 22:39:59 -04:00
2025-09-09 15:50:13 -04:00
except Exception as e:
2025-09-12 22:39:59 -04:00
# Exponential backoff with jitter for 429 or other errors
delay = self.RETRY_DELAY * (2 ** (attempt - 1)) + random.uniform(0, 0.5)
2025-09-09 15:50:13 -04:00
logging.warning(
2025-09-12 22:39:59 -04:00
"Metadata fetch failed for track %s (attempt %d/%d): %s. Retrying in %.2fs",
2025-09-09 15:50:13 -04:00
track_id,
attempt,
self.MAX_METADATA_RETRIES,
str(e),
2025-09-12 22:39:59 -04:00
delay,
2025-09-09 15:50:13 -04:00
)
if attempt < self.MAX_METADATA_RETRIES:
2025-09-12 22:39:59 -04:00
await asyncio.sleep(delay)
2025-09-09 15:50:13 -04:00
else:
logging.error(
"Metadata fetch failed permanently for track %s after %d attempts",
track_id,
self.MAX_METADATA_RETRIES,
)
# Raise a specific exception so callers can react (e.g. notify)
2025-09-23 13:17:34 -04:00
raise MetadataFetchError(
f"Metadata fetch failed permanently for track {track_id} after {self.MAX_METADATA_RETRIES} attempts: {e}"
)
# If we reach here without returning, raise a generic metadata error
raise MetadataFetchError(f"Metadata fetch failed for track {track_id}")
2025-08-15 13:31:15 -04:00
async def download(self, track_id: int, quality: str = "LOSSLESS") -> bool | str:
"""Download track
Args:
track_id (int)
quality (str): The quality of the stream, default is "LOSSLESS". Other options: HIGH, LOW
Returns:
bool
"""
try:
await self._safe_api_call(self.streamrip_client.login, retries=1)
2025-08-15 13:31:15 -04:00
track_url = await self.get_stream_url_by_track_id(track_id)
if not track_url:
return False
parsed_url = urlparse(track_url)
parsed_url_filename = os.path.basename(parsed_url.path)
parsed_url_ext = os.path.splitext(parsed_url_filename)[1]
unique = hashlib.sha256(uuid4().bytes).hexdigest()[:16]
dl_folder_path = (
f"{self.streamrip_config.session.downloads.folder}/{unique}"
)
dl_path = f"{dl_folder_path}/{track_id}.{parsed_url_ext}"
# ensure download folder exists
try:
os.makedirs(dl_folder_path, exist_ok=True)
except Exception:
pass
2025-08-15 13:31:15 -04:00
async with aiohttp.ClientSession() as session:
async with session.get(
track_url, headers={}, timeout=aiohttp.ClientTimeout(total=60)
) as resp:
resp.raise_for_status()
with open(dl_path, "wb") as f:
async for chunk in resp.content.iter_chunked(1024 * 64):
f.write(chunk)
return dl_path
except Exception as e:
logging.critical("Error: %s", str(e))
return False
async def get_lrc_by_track_id(self, track_id: int) -> Optional[str]:
"""Get LRC lyrics by track ID."""
logging.info(f"SR: Fetching metadata for track ID {track_id}")
metadata = await self.get_metadata_by_track_id(track_id)
2025-09-27 09:29:53 -04:00
lrc = metadata.get("lyrics") if metadata else None
logging.info(f"SR: LRC {'found' if lrc else 'not found'}")
return lrc
async def get_lrc_by_artist_song(
2025-09-27 09:29:53 -04:00
self,
artist: str,
song: str,
album: Optional[str] = None,
duration: Optional[int] = None,
) -> Optional[str]:
"""Get LRC lyrics by artist and song, optionally filtering by album and duration."""
logging.info(f"SR: Searching tracks for {artist} - {song}")
tracks = await self.get_tracks_by_artist_song(artist, song)
logging.info(f"SR: Found {len(tracks) if tracks else 0} tracks")
if not tracks:
return None
2025-09-27 09:29:53 -04:00
# Filter by album if provided
if album:
tracks = [
2025-09-27 09:29:53 -04:00
t
for t in tracks
if t.get("album", {}).get("title", "").lower() == album.lower()
]
2025-09-27 09:29:53 -04:00
if not tracks:
return None
2025-09-27 09:29:53 -04:00
# If duration provided, select the track with closest duration match
if duration is not None:
tracks_with_diff = [
2025-09-27 09:29:53 -04:00
(t, abs(t.get("duration", 0) - duration)) for t in tracks
]
tracks_with_diff.sort(key=lambda x: x[1])
best_track, min_diff = tracks_with_diff[0]
logging.info(f"SR: Best match duration diff: {min_diff}s")
# If the closest match is more than 5 seconds off, consider no match
if min_diff > 10:
logging.info("SR: Duration diff too large, no match")
return None
else:
best_track = tracks[0]
2025-09-27 09:29:53 -04:00
track_id = best_track.get("id")
logging.info(f"SR: Using track ID {track_id}")
if not track_id:
return None
2025-09-27 09:29:53 -04:00
return await self.get_lrc_by_track_id(track_id)