From 22eaa2260e629e00d6e085cb7b4a82988fc1ff52 Mon Sep 17 00:00:00 2001 From: codey Date: Thu, 21 Aug 2025 15:06:56 -0400 Subject: [PATCH] another commit without a list of specific changes! (misc) --- endpoints/rip.py | 168 +++++++++++++++++++++++----------------- utils/rip_background.py | 24 +++++- utils/sr_wrapper.py | 16 ++-- 3 files changed, 125 insertions(+), 83 deletions(-) diff --git a/endpoints/rip.py b/endpoints/rip.py index a29dc07..0765553 100644 --- a/endpoints/rip.py +++ b/endpoints/rip.py @@ -5,17 +5,23 @@ from fastapi.responses import JSONResponse from utils.sr_wrapper import SRUtil from auth.deps import get_current_user from redis import Redis -from rq import Queue +from rq import Queue, Retry from rq.job import Job from rq.job import JobStatus +from rq.registry import ( + StartedJobRegistry, DeferredJobRegistry, + FinishedJobRegistry, FailedJobRegistry, + ScheduledJobRegistry) from utils.rip_background import bulk_download from lyric_search.sources import private +from typing import Literal from pydantic import BaseModel class ValidBulkFetchRequest(BaseModel): track_ids: list[int] target: str + quality: Literal["FLAC", "Lossy"] class RIP(FastAPI): @@ -38,7 +44,7 @@ class RIP(FastAPI): "dls", connection=self.redis_conn, default_timeout=14400, - default_result_ttl=86400, + default_result_ttl=-1, default_failure_ttl=86400, ) self.endpoints: dict = { @@ -62,6 +68,30 @@ class RIP(FastAPI): dependencies=dependencies, ) + def _format_job(self, job: Job): + """Helper to normalize job data into JSON.""" + job_status: str | JobStatus = job.get_status() + progress = job.meta.get("progress", 0) + if progress == 100 and not job.meta.get("tarball"): + job_status = "compressing" + + tracks_in = job.meta.get("tracks_in") + tracks_out = len(job.meta.get("tracks", [])) + + return { + "id": job.id, + "status": job_status, + "result": job.result, + "tarball": job.meta.get("tarball"), + "enqueued_at": job.enqueued_at, + "started_at": job.started_at, + "ended_at": job.ended_at, + "progress": progress, + "tracks": f"{tracks_out} / {tracks_in}" if isinstance(tracks_in, int) else tracks_out, + "target": job.meta.get("target"), + "quality": job.meta.get("quality", "Unknown"), + } + async def artists_by_name_handler( self, artist: str, request: Request, user=Depends(get_current_user) ) -> Response: @@ -81,10 +111,11 @@ class RIP(FastAPI): return JSONResponse(content=albums) async def tracks_by_album_id_handler( - self, album_id: int, request: Request, user=Depends(get_current_user) + self, album_id: int, request: Request, user=Depends(get_current_user), + quality: str = "FLAC" ) -> Response: """Get tracks by album id""" - tracks = await self.trip_util.get_tracks_by_album_id(album_id) + tracks = await self.trip_util.get_tracks_by_album_id(album_id, quality) if not tracks: return Response(status_code=404, content="Not Found") return JSONResponse(content=tracks) @@ -100,10 +131,10 @@ class RIP(FastAPI): return JSONResponse(content=tracks) async def track_by_id_handler( - self, track_id: int, request: Request, user=Depends(get_current_user) + self, track_id: int, quality: str, request: Request, user=Depends(get_current_user) ) -> Response: """Get track by ID""" - track = await self.trip_util.get_stream_url_by_track_id(track_id) + track = await self.trip_util.get_stream_url_by_track_id(track_id, quality) if not track: return Response(status_code=404, content="Not found") return JSONResponse(content={"stream_url": track}) @@ -126,15 +157,17 @@ class RIP(FastAPI): target = data.target job = self.task_queue.enqueue( bulk_download, - args=(track_ids,), + args=(track_ids, data.quality,), job_timeout=14400, failure_ttl=86400, - result_ttl=86400, + result_ttl=-1, + retry=Retry(max=1, interval=[30]), meta={ 'progress': 0, 'status': 'queued', 'target': target, 'tracks_in': len(track_ids), + 'quality': data.quality, } ) @@ -143,90 +176,79 @@ class RIP(FastAPI): content={ "job_id": job.id, "status": "queued", - "target": job.meta.get("target", None) + "target": job.meta.get("target", None), + "quality": job.meta.get("quality", "Unknown"), } ) - async def job_status_handler( - self, job_id: str, request: Request, user=Depends(get_current_user) - ): + async def job_status_handler(self, job_id: str, request: Request, user=Depends(get_current_user)): """Get status and result of a single job""" + + job = None try: + # Try direct fetch first job = Job.fetch(job_id, connection=self.redis_conn) except Exception: + # If not found, try registries explicitly (in case fetch failed because the job left the queue) + registries = [ + StartedJobRegistry(queue=self.task_queue), + FinishedJobRegistry(queue=self.task_queue), + FailedJobRegistry(queue=self.task_queue), + DeferredJobRegistry(queue=self.task_queue), + ScheduledJobRegistry(queue=self.task_queue), + ] + for registry in registries: + if job_id in registry.get_job_ids(): + try: + job = Job.fetch(job_id, connection=self.redis_conn) + except Exception: + pass + break + + if job is None: return JSONResponse({"error": "Job not found"}, status_code=404) - job_status: str|JobStatus = job.get_status() - job_progress = job.meta.get("progress", 0) - job_tarball = job.meta.get("tarball") - if job_progress == 100 and not job_tarball: - job_status = "compressing" - tracks_out = len(job.meta.get("tracks", [])) - tracks_in = job.meta.get("tracks_in", None) - - return { - "id": job.id, - "status": job_status, - "result": job.result, - "enqueued_at": job.enqueued_at, - "started_at": job.started_at, - "ended_at": job.ended_at, - "progress": job_progress, - "target": job.meta.get("target"), - "tracks": f"{tracks_out} / {tracks_in}" if isinstance(tracks_in, int) else tracks_out, - "tarball": job_tarball, - } + return self._format_job(job) + async def job_list_handler(self, request: Request, user=Depends(get_current_user)): - """List all jobs in the queue (queued + finished, if result_ttl allows)""" + """List all jobs across all registries (queued, started, finished, failed, etc).""" jobs_info = [] + seen = set() - # Jobs still in the queue (pending) + # 1. Jobs still waiting in queue for job in self.task_queue.jobs: - status: str|JobStatus = job.get_status() - job_progress = job.meta.get("progress", 0) - tarball = job.meta.get("tarball") - if job_progress == 100 and not tarball: - status = "compressing" - jobs_info.append( - { - "id": job.id, - "status": status, # queued - "result": job.result, - "enqueued_at": job.enqueued_at, - "progress": job.meta.get("progress", 0), - "target": job.meta.get("target", None) - } - ) + jobs_info.append(self._format_job(job)) + seen.add(job.id) - # Running jobs tracked via enqueued_job_ids + # 2. Jobs in Started/Finished/Failed/Deferred registries + registries = [ + StartedJobRegistry(queue=self.task_queue), + FinishedJobRegistry(queue=self.task_queue), + FailedJobRegistry(queue=self.task_queue), + DeferredJobRegistry(queue=self.task_queue), + ] + for registry in registries: + for jid in registry.get_job_ids(): + if jid in seen: + continue + try: + job = Job.fetch(jid, connection=self.redis_conn) + jobs_info.append(self._format_job(job)) + seen.add(job.id) + except Exception: + continue # job might have been cleaned up + + # 3. Jobs tracked in your custom enqueued_job_ids list job_ids = self.redis_conn.lrange("enqueued_job_ids", 0, -1) for jid_bytes in job_ids: # type: ignore jid = jid_bytes.decode() + if jid in seen: + continue try: job = Job.fetch(jid, connection=self.redis_conn) + jobs_info.append(self._format_job(job)) + seen.add(job.id) except Exception: - continue # job may have completed and expired - - job_status: str|JobStatus = job.get_status() - job_progress = job.meta.get("progress", 0) - if job_progress == 100 and not job.meta.get("tarball"): - job_status = "compressing" - if job_status in ("started", "queued", "finished", "failed", "compressing"): - # avoid duplicates for jobs already in task_queue.jobs - if not any(j["id"] == job.id for j in jobs_info): - tracks_in = job.meta.get("tracks_in", None) - tracks_out = len(job.meta.get("tracks", [])) - jobs_info.append( - { - "id": job.id, - "status": job_status, - "result": job.result, - "tarball": job.meta.get("tarball", None), - "enqueued_at": job.enqueued_at, - "progress": job.meta.get("progress", 0), - "tracks": f"{tracks_out} / {tracks_in}" if isinstance(tracks_in, int) else tracks_out, - "target": job.meta.get("target", None), - } - ) + continue return {"jobs": jobs_info} diff --git a/utils/rip_background.py b/utils/rip_background.py index 3dd8f78..8a0402f 100644 --- a/utils/rip_background.py +++ b/utils/rip_background.py @@ -39,6 +39,23 @@ sr = SRUtil() # ---------- Helpers ---------- + +def cleanup_empty_dirs(root: Path): + """ + Recursively remove any directories under root that contain no files + (empty or only empty subdirectories). + """ + for dirpath, dirnames, filenames in os.walk(root, topdown=False): + p = Path(dirpath) + # Check if there are any files in this directory or subdirectories + has_file = any(f.is_file() for f in p.rglob("*")) + if not has_file: + try: + p.rmdir() # safe to remove empty dirs + except Exception: + pass + + def sanitize_filename(name: str) -> str: """Make a string safe for file/dir names.""" if not name: @@ -64,7 +81,7 @@ def ensure_unique_path(p: Path) -> Path: return parent / f"{stem}_{short_id}{suffix}" # ---------- Job ---------- -def bulk_download(track_list: list): +def bulk_download(track_list: list, quality: str = "FLAC"): """ RQ job: - fetches stream URLs @@ -118,7 +135,7 @@ def bulk_download(track_list: list): try: # 1) Stream URL - url = await sr.get_stream_url_by_track_id(track_id) + url = await sr.get_stream_url_by_track_id(track_id, quality) if not url: raise RuntimeError("No stream URL") @@ -228,7 +245,7 @@ def bulk_download(track_list: list): # Stage tarball in ROOT_DIR first staged_tarball = ROOT_DIR / f"{combined_artist}_{short_id}.tar.gz" - final_tarball = ROOT_DIR / "completed" / staged_tarball.name + final_tarball = ROOT_DIR / "completed" / quality / staged_tarball.name final_tarball.parent.mkdir(parents=True, exist_ok=True) if job: @@ -276,6 +293,7 @@ def bulk_download(track_list: list): shutil.move(str(staged_tarball), str(final_tarball)) logging.critical("Tarball finalized: %s", final_tarball) + await asyncio.to_thread(cleanup_empty_dirs, ROOT_DIR) if job: job.meta["tarball"] = str(final_tarball) job.meta["progress"] = 100 diff --git a/utils/sr_wrapper.py b/utils/sr_wrapper.py index 25baa6e..e1e8e5f 100644 --- a/utils/sr_wrapper.py +++ b/utils/sr_wrapper.py @@ -4,7 +4,6 @@ from urllib.parse import urlparse import hashlib import logging import os -import asyncio import aiohttp from streamrip.client import TidalClient # type: ignore from streamrip.config import Config as StreamripConfig # type: ignore @@ -135,7 +134,8 @@ class SRUtil: logging.debug("Retrieved albums: %s", albums_out) return albums_out - async def get_tracks_by_album_id(self, album_id: int) -> Optional[list | dict]: + async def get_tracks_by_album_id(self, album_id: int, + quality: str = "FLAC") -> Optional[list | dict]: """Get tracks by album ID Args: album_id (int): The ID of the album. @@ -177,7 +177,7 @@ class SRUtil: return [] async def get_stream_url_by_track_id( - self, track_id: int, quality: str = "LOSSLESS" + self, track_id: int, quality: str = "FLAC" ) -> Optional[str]: """Get stream URL by track ID Args: @@ -186,19 +186,21 @@ class SRUtil: Returns: Optional[str]: The stream URL or None if not found. """ - if quality not in ["LOSSLESS", "HIGH", "LOW"]: + if quality not in ["FLAC", "Lossy"]: logging.error("Invalid quality requested: %s", quality) + return None quality_int: int = int(self.streamrip_config.session.tidal.quality) match quality: - case "HIGH": + case "FLAC": + quality_int = 2 + case "Lossy": quality_int = 1 - case "LOW": - quality_int = 0 track_id_str: str = str(track_id) await self.streamrip_client.login() try: + logging.critical("Using quality_int: %s", quality_int) track = await self.streamrip_client.get_downloadable( track_id=track_id_str, quality=quality_int )