From f6d4ed57f35fe5f8ba1e331d8e913974999c05a6 Mon Sep 17 00:00:00 2001 From: codey Date: Tue, 9 Sep 2025 15:50:13 -0400 Subject: [PATCH] misc --- endpoints/misc.py | 10 +-- endpoints/rip.py | 9 ++- utils/meme_util.py | 3 + utils/rip_background.py | 175 ++++++++++++++++++++++++---------------- utils/sr_wrapper.py | 117 +++++++++++++++++++++++---- 5 files changed, 225 insertions(+), 89 deletions(-) diff --git a/endpoints/misc.py b/endpoints/misc.py index 67656bb..98d8854 100644 --- a/endpoints/misc.py +++ b/endpoints/misc.py @@ -13,7 +13,7 @@ from rq.registry import ( StartedJobRegistry, FinishedJobRegistry, FailedJobRegistry, - DeferredJobRegistry + DeferredJobRegistry, ) from lyric_search.sources import private, cache as LyricsCache, redis_cache @@ -166,7 +166,7 @@ class Misc(FastAPI): "sessions": -1, } ) - + async def homepage_rq_widget(self) -> JSONResponse: """ Homepage RQ Widget Handler @@ -176,8 +176,8 @@ class Misc(FastAPI): queued = queue.count started = StartedJobRegistry(queue_name, connection=self.redis_client).count failed = FailedJobRegistry(queue_name, connection=self.redis_client).count - finished = FinishedJobRegistry(queue_name, connection=self.redis_client).count - deferred = DeferredJobRegistry(queue_name, connection=self.redis_client).count + finished = FinishedJobRegistry(queue_name, connection=self.redis_client).count + deferred = DeferredJobRegistry(queue_name, connection=self.redis_client).count return JSONResponse( content={ @@ -189,7 +189,7 @@ class Misc(FastAPI): "deferred": deferred, } } - ) + ) async def homepage_sqlite_widget(self) -> JSONResponse: """ diff --git a/endpoints/rip.py b/endpoints/rip.py index 5ce2f77..30d2caf 100644 --- a/endpoints/rip.py +++ b/endpoints/rip.py @@ -120,7 +120,7 @@ class RIP(FastAPI): album_id: int, request: Request, user=Depends(get_current_user), - quality: Literal["FLAC", "Lossy"] = "FLAC" + quality: Literal["FLAC", "Lossy"] = "FLAC", ) -> Response: """Get tracks by album id""" tracks = await self.trip_util.get_tracks_by_album_id(album_id, quality) @@ -269,7 +269,12 @@ class RIP(FastAPI): # ---- Sort newest first ---- def job_sort_key(job): - return job.get("ended_at") or job.get("started_at") or job.get("enqueued_at") or 0 + return ( + job.get("ended_at") + or job.get("started_at") + or job.get("enqueued_at") + or 0 + ) jobs_info.sort(key=job_sort_key, reverse=True) diff --git a/utils/meme_util.py b/utils/meme_util.py index a955577..d2061ba 100644 --- a/utils/meme_util.py +++ b/utils/meme_util.py @@ -26,6 +26,7 @@ class MemeUtil: bool """ # Accepts either bytes or a BytesIO-like object + signature = None if isinstance(buffer, io.BytesIO): if hasattr(buffer, "read") and hasattr(buffer, "seek"): pos = buffer.tell() @@ -153,6 +154,8 @@ class MemeUtil: query: str = "SELECT count(id) AS count FROM memes" async with await db_conn.execute(query) as db_cursor: result = await db_cursor.fetchone() + if not result: + return None count = result["count"] if not isinstance(count, int): return None diff --git a/utils/rip_background.py b/utils/rip_background.py index 6b29058..d0c42c9 100644 --- a/utils/rip_background.py +++ b/utils/rip_background.py @@ -3,22 +3,24 @@ import asyncio import random import os import tarfile +import traceback import uuid import subprocess import shutil import re from pathlib import Path from urllib.parse import urlparse, unquote - import aiohttp +from datetime import datetime +from mediafile import MediaFile # type: ignore[import] from rq import get_current_job from utils.sr_wrapper import SRUtil # ---------- Config ---------- ROOT_DIR = Path("/storage/music2") -MAX_RETRIES = 3 -THROTTLE_MIN = 0.3 -THROTTLE_MAX = 1.0 +MAX_RETRIES = 5 +THROTTLE_MIN = 1.7 +THROTTLE_MAX = 10.0 HEADERS = { "User-Agent": ( @@ -42,6 +44,48 @@ sr = SRUtil() # ---------- Helpers ---------- +def tag_with_mediafile(file_path: str, meta: dict): + f = MediaFile(file_path) + + # --- Helper to safely set textual/number fields --- + def safe_set(attr, value, default=None, cast=None): + if value is None: + value = default + if value is not None: + if cast is not None: + setattr(f, attr, cast(value)) + else: + setattr(f, attr, str(value)) + + # --- Basic textual metadata --- + safe_set("title", meta.get("title"), default="Unknown Title") + safe_set("artist", meta.get("artist"), default="Unknown Artist") + safe_set("albumartist", meta.get("album_artist"), default="Unknown Artist") + safe_set("album", meta.get("album"), default="Unknown Album") + safe_set("track", meta.get("track_number"), default=0, cast=int) + safe_set("disc", meta.get("disc_number"), default=0, cast=int) + safe_set("isrc", meta.get("isrc"), default="") + safe_set("bpm", meta.get("bpm"), default=0, cast=int) + + # --- Release date --- + release_date_str = meta.get("release_date") + release_date_obj = None + if release_date_str: + try: + release_date_obj = datetime.fromisoformat(release_date_str).date() + except ValueError: + try: + # fallback if only year string + release_date_obj = datetime(int(release_date_str[:4]), 1, 1).date() + except Exception: + pass + if release_date_obj: + f.date = release_date_obj + + # --- Save all tags --- + f.save() + + def cleanup_empty_dirs(root: Path): """ Recursively remove any directories under root that contain no files @@ -49,11 +93,10 @@ def cleanup_empty_dirs(root: Path): """ for dirpath, dirnames, filenames in os.walk(root, topdown=False): p = Path(dirpath) - # Check if there are any files in this directory or subdirectories has_file = any(f.is_file() for f in p.rglob("*")) if not has_file: try: - p.rmdir() # safe to remove empty dirs + p.rmdir() except Exception: pass @@ -62,41 +105,33 @@ def sanitize_filename(name: str) -> str: """Make a string safe for file/dir names.""" if not name: return "Unknown" - # Replace path separators first name = name.replace("/", "-").replace("\\", "-") - # Remove illegal characters on common filesystems name = re.sub(r'[<>:"|?*\x00-\x1F]', "", name) - # Trim spaces and trailing dots name = name.strip().strip(".") - # Collapse whitespace name = re.sub(r"\s+", " ", name) - # Reasonable length cap return name[:180] or "Unknown" + def ensure_unique_path(p: Path) -> Path: """ - Ensure the given file or directory path is unique. - If a conflict exists, append (2), (3), ... until it's unique. + Ensure the given file or directory path is unique *within its parent folder*. + Only appends (2), (3)... if a real conflict exists in that folder. """ parent = p.parent stem, suffix = p.stem, p.suffix + existing = {f.name for f in parent.glob(f"*{suffix}") if f.is_file()} - # If suffix is empty → directory case - if p.is_dir() or suffix == "": - candidate = parent / stem - counter = 2 - while candidate.exists(): - candidate = parent / f"{stem} ({counter})" - counter += 1 - return candidate + candidate = f"{stem}{suffix}" + if candidate not in existing: + return parent / candidate - # File case - candidate = parent / f"{stem}{suffix}" counter = 2 - while candidate.exists(): - candidate = parent / f"{stem} ({counter}){suffix}" + while True: + candidate = f"{stem} ({counter}){suffix}" + if candidate not in existing: + return parent / candidate counter += 1 - return candidate + # ---------- Job ---------- def bulk_download(track_list: list, quality: str = "FLAC"): @@ -109,12 +144,13 @@ def bulk_download(track_list: list, quality: str = "FLAC"): - returns [tarball_path] """ job = get_current_job() + job_id = job.id if job else uuid.uuid4().hex + staging_root = ROOT_DIR / job_id - # Initialize job meta in a JSON/pickle-safe way if job: try: job.meta["track_ids"] = [str(t) for t in (track_list or [])] - job.meta["tracks"] = [] # will hold per-track dicts + job.meta["tracks"] = [] job.meta["progress"] = 0 job.meta["tarball"] = None job.meta["status"] = "Started" @@ -123,9 +159,9 @@ def bulk_download(track_list: list, quality: str = "FLAC"): logging.warning("Failed to init job.meta: %s", e) async def process_tracks(): - per_track_meta = [] # list of per-track dicts (JSON-safe) - all_final_files = [] # list[Path] - all_artists = set() # set[str] + per_track_meta = [] + all_final_files = [] + all_artists = set() (ROOT_DIR / "completed").mkdir(parents=True, exist_ok=True) @@ -139,10 +175,10 @@ def bulk_download(track_list: list, quality: str = "FLAC"): for i, track_id in enumerate(track_list or []): track_info = { "track_id": str(track_id), - "status": "Pending", # Pending | Success | Failed - "file_path": None, # str | None - "error": None, # str | None - "attempts": 0, # int + "status": "Pending", + "file_path": None, + "error": None, + "attempts": 0, } attempt = 0 @@ -152,31 +188,27 @@ def bulk_download(track_list: list, quality: str = "FLAC"): track_info["attempts"] = attempt try: - # 1) Stream URL url = await sr.get_stream_url_by_track_id(track_id, quality) if not url: raise RuntimeError("No stream URL") - # 2) Extension from URL path only (no query) parsed = urlparse(url) clean_path = unquote(parsed.path) ext = Path(clean_path).suffix or ".mp3" - # Unique temp file tmp_file = Path(f"/tmp/{uuid.uuid4().hex}{ext}") - # 3) Download (chunked) async with session.get(url) as resp: resp.raise_for_status() with open(tmp_file, "wb") as f: async for chunk in resp.content.iter_chunked(64 * 1024): f.write(chunk) - # 4) Metadata from SR (prefer API over tags) md = await sr.get_metadata_by_track_id(track_id) or {} + logging.info("Metadata for %s: %s", track_id, md) artist_raw = md.get("artist") or "Unknown Artist" album_raw = md.get("album") or "Unknown Album" - title_raw = md.get("song") or f"Track {track_id}" + title_raw = md.get("title") or f"Track {track_id}" artist = sanitize_filename(artist_raw) album = sanitize_filename(album_raw) @@ -184,18 +216,16 @@ def bulk_download(track_list: list, quality: str = "FLAC"): all_artists.add(artist) - # 5) Final path - artist_dir = ROOT_DIR / artist + artist_dir = staging_root / artist album_dir = artist_dir / album album_dir.mkdir(parents=True, exist_ok=True) - # Only ensure uniqueness at the file level final_file = ensure_unique_path(album_dir / f"{title}{ext}") + tag_with_mediafile(str(tmp_file), md) tmp_file.rename(final_file) - tmp_file = None # consumed + tmp_file = None - # Track success track_info["status"] = "Success" track_info["file_path"] = str(final_file) track_info["error"] = None @@ -204,26 +234,37 @@ def bulk_download(track_list: list, quality: str = "FLAC"): if job: job.meta["progress"] = int(((i + 1) / total) * 100) job.save_meta() - break # success; exit retry loop + break + + except aiohttp.ClientResponseError as e: + if e.status == 429: + wait_time = min(60, 2**attempt) # exponential up to 60s + logging.warning( + "Rate limited (429). Sleeping %s seconds", wait_time + ) + await asyncio.sleep(wait_time) + else: + await asyncio.sleep( + random.uniform(THROTTLE_MIN, THROTTLE_MAX) + ) except Exception as e: logging.error( "Track %s attempt %s failed: %s", track_id, attempt, e ) + traceback.print_exc() track_info["error"] = str(e) if attempt >= MAX_RETRIES: track_info["status"] = "Failed" - # small backoff before next attempt (or next track) await asyncio.sleep(random.uniform(THROTTLE_MIN, THROTTLE_MAX)) finally: - # Clean partial temp file on failure try: + await session.close() if tmp_file and tmp_file.exists(): tmp_file.unlink() except Exception: pass - # Update RQ meta after each track per_track_meta.append(track_info) if job: try: @@ -234,10 +275,8 @@ def bulk_download(track_list: list, quality: str = "FLAC"): "Failed to update job.meta after track %s: %s", track_id, e ) - # Throttle between tracks await asyncio.sleep(random.uniform(THROTTLE_MIN, THROTTLE_MAX)) - # ---- Single combined tarball for all tracks ---- if not all_final_files: if job: try: @@ -248,12 +287,11 @@ def bulk_download(track_list: list, quality: str = "FLAC"): pass return [] - # Pick artist with the most tracks artist_counts: dict[str, int] = {} for t in per_track_meta: if t["status"] == "Success" and t.get("file_path"): try: - artist = Path(t["file_path"]).relative_to(ROOT_DIR).parts[0] + artist = Path(t["file_path"]).relative_to(staging_root).parts[0] except Exception: artist = "Unknown Artist" artist_counts[artist] = artist_counts.get(artist, 0) + 1 @@ -267,9 +305,8 @@ def bulk_download(track_list: list, quality: str = "FLAC"): combined_artist = sanitize_filename(top_artist) short_id = uuid.uuid4().hex[:8] + staged_tarball = staging_root / f"{combined_artist}_{short_id}.tar.gz" - # Stage tarball in ROOT_DIR first - staged_tarball = ROOT_DIR / f"{combined_artist}_{short_id}.tar.gz" final_tarball = ROOT_DIR / "completed" / quality / staged_tarball.name final_tarball.parent.mkdir(parents=True, exist_ok=True) @@ -282,29 +319,32 @@ def bulk_download(track_list: list, quality: str = "FLAC"): logging.info("Creating tarball: %s", staged_tarball) - # Run blocking tar creation in background thread def _create_tar_sync(): try: subprocess.run( [ - "tar", "-I", "pigz -9", "-cf", str(staged_tarball), - "-C", str(ROOT_DIR) - ] + [str(f.relative_to(ROOT_DIR)) for f in all_final_files], - check=True + "tar", + "-I", + "pigz -9", + "-cf", + str(staged_tarball), + "-C", + str(staging_root), + ] + + [str(f.relative_to(staging_root)) for f in all_final_files], + check=True, ) - # cleanup files after successful tar for f in all_final_files: try: os.remove(f) except Exception: pass except FileNotFoundError: - # pigz or tar not available → fallback to Python tarfile logging.warning("pigz not available, falling back to tarfile (slower).") with tarfile.open(staged_tarball, "w:gz") as tar: for f in all_final_files: try: - arcname = f.relative_to(ROOT_DIR) + arcname = f.relative_to(staging_root) except ValueError: arcname = f.name tar.add(f, arcname=str(arcname)) @@ -315,7 +355,6 @@ def bulk_download(track_list: list, quality: str = "FLAC"): await asyncio.to_thread(_create_tar_sync) - # sanity check if not staged_tarball.exists(): logging.error("Tarball was not created: %s", staged_tarball) if job: @@ -328,14 +367,15 @@ def bulk_download(track_list: list, quality: str = "FLAC"): logging.critical("Tarball created: %s", staged_tarball) - # Now move tarball into completed folder try: staged_tarball.rename(final_tarball) except Exception: shutil.move(str(staged_tarball), str(final_tarball)) logging.critical("Tarball finalized: %s", final_tarball) - await asyncio.to_thread(cleanup_empty_dirs, ROOT_DIR) + + await asyncio.to_thread(shutil.rmtree, staging_root, ignore_errors=True) + if job: job.meta["tarball"] = str(final_tarball) job.meta["progress"] = 100 @@ -344,7 +384,6 @@ def bulk_download(track_list: list, quality: str = "FLAC"): return [str(final_tarball)] - # Run async part synchronously for RQ loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) try: diff --git a/utils/sr_wrapper.py b/utils/sr_wrapper.py index eadb67d..3bf2ce3 100644 --- a/utils/sr_wrapper.py +++ b/utils/sr_wrapper.py @@ -1,8 +1,9 @@ -from typing import Optional +from typing import Optional, Any from uuid import uuid4 from urllib.parse import urlparse import hashlib import logging +import asyncio import os import aiohttp from streamrip.client import TidalClient # type: ignore @@ -43,6 +44,8 @@ class SRUtil: ) self.streamrip_config self.streamrip_client = TidalClient(self.streamrip_config) + self.MAX_METADATA_RETRIES = 5 + self.RETRY_DELAY = 1.0 # seconds between retries def dedupe_by_key(self, key: str, entries: list[dict]) -> list[dict]: deduped = {} @@ -58,6 +61,64 @@ class SRUtil: m, s = divmod(seconds, 60) return f"{m}:{s:02}" + def combine_album_track_metadata( + self, album_json: dict[str, Any], track_json: dict[str, Any] + ) -> dict[str, Any]: + """ + Combine album-level and track-level metadata into a unified tag dictionary. + If track_json comes from album_json['tracks'], it will override album-level values where relevant. + """ + # Album-level + combined = { + "album": album_json.get("title"), + "album_artist": album_json.get("artist", {}).get("name"), + "release_date": album_json.get("releaseDate"), + "album_type": album_json.get("type"), + "total_tracks": album_json.get("numberOfTracks"), + "upc": album_json.get("upc"), + "album_copyright": album_json.get("copyright"), + "album_cover_id": album_json.get("cover"), + "album_cover_url": f"https://resources.tidal.com/images/{album_json.get('cover')}/1280x1280.jpg" + if album_json.get("cover") + else None, + } + + # Track-level (overrides or adds to album info) + combined.update( + { + "title": track_json.get("title"), + "artist": track_json.get("artist", {}).get("name"), + "artists": [a.get("name") for a in track_json.get("artists", [])], + "track_number": track_json.get("trackNumber"), + "disc_number": track_json.get("volumeNumber"), + "duration": track_json.get("duration"), + "isrc": track_json.get("isrc"), + "bpm": track_json.get("bpm"), + "explicit": track_json.get("explicit"), + "replaygain": track_json.get("replayGain"), + "peak": track_json.get("peak"), + "lyrics": track_json.get("lyrics"), + "track_copyright": track_json.get("copyright"), + "cover_id": track_json.get("album", {}).get( + "cover", album_json.get("cover") + ), + "cover_url": f"https://resources.tidal.com/images/{track_json.get('album', {}).get('cover', album_json.get('cover'))}/1280x1280.jpg" + if (track_json.get("album", {}).get("cover") or album_json.get("cover")) + else None, + } + ) + + return combined + + def combine_album_with_all_tracks( + self, album_json: dict[str, Any] + ) -> list[dict[str, Any]]: + """Return a list of combined metadata dicts for all tracks in an album JSON.""" + return [ + self.combine_album_track_metadata(album_json, t) + for t in album_json.get("tracks", []) + ] + async def get_artists_by_name(self, artist_name: str) -> Optional[list]: """Get artist(s) by name. Args: @@ -220,19 +281,47 @@ class SRUtil: return stream_url async def get_metadata_by_track_id(self, track_id: int) -> Optional[dict]: - try: - await self.streamrip_client.login() - metadata = await self.streamrip_client.get_metadata(str(track_id), "track") - return { - "artist": metadata.get("artist", {}).get("name", "Unknown Artist"), - "album": metadata.get("album", {}).get("title", "Unknown Album"), - "song": metadata.get("title", uuid4()), - } - except Exception as e: - logging.critical( - "Get metadata for %s failed, Exception: %s", track_id, str(e) - ) - return None + """ + Fetch track + album metadata with retries. + Returns combined metadata dict or None after exhausting retries. + """ + for attempt in range(1, self.MAX_METADATA_RETRIES + 1): + try: + await self.streamrip_client.login() + metadata = await self.streamrip_client.get_metadata( + str(track_id), "track" + ) + album_id = metadata.get("album", {}).get("id") + album_metadata = await self.streamrip_client.get_metadata( + album_id, "album" + ) + combined_metadata: dict = self.combine_album_track_metadata( + album_metadata, metadata + ) + logging.info( + "Combined metadata for track ID %s (attempt %d): %s", + track_id, + attempt, + combined_metadata, + ) + return combined_metadata + except Exception as e: + logging.warning( + "Metadata fetch failed for track %s (attempt %d/%d): %s", + track_id, + attempt, + self.MAX_METADATA_RETRIES, + str(e), + ) + if attempt < self.MAX_METADATA_RETRIES: + await asyncio.sleep(self.RETRY_DELAY) + else: + logging.error( + "Metadata fetch failed permanently for track %s after %d attempts", + track_id, + self.MAX_METADATA_RETRIES, + ) + return None async def download(self, track_id: int, quality: str = "LOSSLESS") -> bool | str: """Download track