From e0f64f6773ade4b0b2b27d56fec1cd7c02179849 Mon Sep 17 00:00:00 2001 From: codey Date: Wed, 20 Aug 2025 15:58:07 -0400 Subject: [PATCH] misc --- endpoints/rip.py | 50 +++++++++++++++---- utils/rip_background.py | 108 ++++++++++++++++++++-------------------- 2 files changed, 93 insertions(+), 65 deletions(-) diff --git a/endpoints/rip.py b/endpoints/rip.py index a539085..a29dc07 100644 --- a/endpoints/rip.py +++ b/endpoints/rip.py @@ -7,6 +7,7 @@ from auth.deps import get_current_user from redis import Redis from rq import Queue from rq.job import Job +from rq.job import JobStatus from utils.rip_background import bulk_download from lyric_search.sources import private from pydantic import BaseModel @@ -125,16 +126,24 @@ class RIP(FastAPI): target = data.target job = self.task_queue.enqueue( bulk_download, - args=(track_ids, target), + args=(track_ids,), job_timeout=14400, failure_ttl=86400, result_ttl=86400, + meta={ + 'progress': 0, + 'status': 'queued', + 'target': target, + 'tracks_in': len(track_ids), + } + ) self.redis_conn.lpush("enqueued_job_ids", job.id) return JSONResponse( content={ "job_id": job.id, "status": "queued", + "target": job.meta.get("target", None) } ) @@ -146,16 +155,25 @@ class RIP(FastAPI): job = Job.fetch(job_id, connection=self.redis_conn) except Exception: return JSONResponse({"error": "Job not found"}, status_code=404) + job_status: str|JobStatus = job.get_status() + job_progress = job.meta.get("progress", 0) + job_tarball = job.meta.get("tarball") + if job_progress == 100 and not job_tarball: + job_status = "compressing" + tracks_out = len(job.meta.get("tracks", [])) + tracks_in = job.meta.get("tracks_in", None) return { "id": job.id, - "status": job.get_status(), + "status": job_status, "result": job.result, "enqueued_at": job.enqueued_at, "started_at": job.started_at, "ended_at": job.ended_at, - "progress": job.meta.get("progress"), + "progress": job_progress, "target": job.meta.get("target"), + "tracks": f"{tracks_out} / {tracks_in}" if isinstance(tracks_in, int) else tracks_out, + "tarball": job_tarball, } async def job_list_handler(self, request: Request, user=Depends(get_current_user)): @@ -164,18 +182,23 @@ class RIP(FastAPI): # Jobs still in the queue (pending) for job in self.task_queue.jobs: + status: str|JobStatus = job.get_status() + job_progress = job.meta.get("progress", 0) + tarball = job.meta.get("tarball") + if job_progress == 100 and not tarball: + status = "compressing" jobs_info.append( { "id": job.id, - "status": job.get_status(), # queued + "status": status, # queued "result": job.result, "enqueued_at": job.enqueued_at, - "progress": job.meta.get("progress", None), + "progress": job.meta.get("progress", 0), "target": job.meta.get("target", None) } ) - # Started/running jobs tracked via enqueued_job_ids + # Running jobs tracked via enqueued_job_ids job_ids = self.redis_conn.lrange("enqueued_job_ids", 0, -1) for jid_bytes in job_ids: # type: ignore jid = jid_bytes.decode() @@ -184,19 +207,24 @@ class RIP(FastAPI): except Exception: continue # job may have completed and expired - status = job.get_status() - if status in ("started", "queued", "finished"): + job_status: str|JobStatus = job.get_status() + job_progress = job.meta.get("progress", 0) + if job_progress == 100 and not job.meta.get("tarball"): + job_status = "compressing" + if job_status in ("started", "queued", "finished", "failed", "compressing"): # avoid duplicates for jobs already in task_queue.jobs if not any(j["id"] == job.id for j in jobs_info): + tracks_in = job.meta.get("tracks_in", None) + tracks_out = len(job.meta.get("tracks", [])) jobs_info.append( { "id": job.id, - "status": status, + "status": job_status, "result": job.result, "tarball": job.meta.get("tarball", None), "enqueued_at": job.enqueued_at, - "progress": job.meta.get("progress", None), - "tracks": job.meta.get("tracks", None), + "progress": job.meta.get("progress", 0), + "tracks": f"{tracks_out} / {tracks_in}" if isinstance(tracks_in, int) else tracks_out, "target": job.meta.get("target", None), } ) diff --git a/utils/rip_background.py b/utils/rip_background.py index 10613c0..3dd8f78 100644 --- a/utils/rip_background.py +++ b/utils/rip_background.py @@ -14,7 +14,7 @@ from rq import get_current_job from utils.sr_wrapper import SRUtil # ---------- Config ---------- -ROOT_DIR = Path("/storage/music2") # change to your music folder +ROOT_DIR = Path("/storage/music2") MAX_RETRIES = 3 THROTTLE_MIN = 0.3 THROTTLE_MAX = 1.0 @@ -56,26 +56,21 @@ def sanitize_filename(name: str) -> str: def ensure_unique_path(p: Path) -> Path: - """If path exists, append ' (n)' before extension.""" - if not p.exists(): - return p + """Always append a short UUID fragment before the extension.""" stem, suffix = p.stem, p.suffix parent = p.parent - n = 2 - while True: - candidate = parent / f"{stem} ({n}){suffix}" - if not candidate.exists(): - return candidate - n += 1 + + short_id = uuid.uuid4().hex[:8] + return parent / f"{stem}_{short_id}{suffix}" # ---------- Job ---------- -def bulk_download(track_list: list, target: str): +def bulk_download(track_list: list): """ RQ job: - fetches stream URLs - downloads with retries + throttling - uses SR metadata to name/organize files - - creates ONE tarball for all tracks, with all artist names in the filename + - creates ONE tarball for all tracks - returns [tarball_path] """ job = get_current_job() @@ -87,7 +82,7 @@ def bulk_download(track_list: list, target: str): job.meta["tracks"] = [] # will hold per-track dicts job.meta["progress"] = 0 job.meta["tarball"] = None - job.meta["target"] = target + job.meta["status"] = "started" job.save_meta() except Exception as e: logging.warning("Failed to init job.meta: %s", e) @@ -102,6 +97,9 @@ def bulk_download(track_list: list, target: str): async with aiohttp.ClientSession(headers=HEADERS) as session: total = len(track_list or []) logging.critical("Total tracks to process: %s", total) + if job: + job.meta["progress"] = 0 + job.save_meta() for i, track_id in enumerate(track_list or []): track_info = { @@ -164,6 +162,10 @@ def bulk_download(track_list: list, target: str): track_info["file_path"] = str(final_file) track_info["error"] = None all_final_files.append(final_file) + + if job: + job.meta["progress"] = int(((i + 1) / total) * 100) + job.save_meta() break # success; exit retry loop except Exception as e: @@ -186,7 +188,6 @@ def bulk_download(track_list: list, target: str): if job: try: job.meta["tracks"] = per_track_meta - job.meta["progress"] = int(((i + 1) / max(total, 1)) * 100) job.save_meta() except Exception as e: logging.warning("Failed to update job.meta after track %s: %s", track_id, e) @@ -194,7 +195,7 @@ def bulk_download(track_list: list, target: str): # Throttle between tracks await asyncio.sleep(random.uniform(THROTTLE_MIN, THROTTLE_MAX)) - # ---- Single combined tarball for all tracks ---- + # ---- Single combined tarball for all tracks ---- if not all_final_files: if job: try: @@ -230,26 +231,41 @@ def bulk_download(track_list: list, target: str): final_tarball = ROOT_DIR / "completed" / staged_tarball.name final_tarball.parent.mkdir(parents=True, exist_ok=True) + if job: + try: + job.meta["status"] = "compressing" + job.save_meta() + except Exception: + pass - with tarfile.open(staged_tarball, "w:gz") as tar: - # Update job status → compressing + logging.info("Creating tarball: %s", staged_tarball) + + # Run blocking tar creation in background thread + def _create_tar_sync(): + with tarfile.open(staged_tarball, "w:gz") as tar: + for f in all_final_files: + try: + arcname = f.relative_to(ROOT_DIR) + except ValueError: + arcname = f.name + tar.add(f, arcname=str(arcname)) + try: + os.remove(f) + except Exception: + pass + + await asyncio.to_thread(_create_tar_sync) + + # sanity check + if not staged_tarball.exists(): + logging.error("Tarball was not created: %s", staged_tarball) if job: try: - job.meta["status"] = "compressing" + job.meta["status"] = "compress_failed" job.save_meta() - except Exception: - pass - logging.info("Creating tarball: %s", staged_tarball) - for f in all_final_files: - try: - arcname = f.relative_to(ROOT_DIR) - except ValueError: - arcname = f.name - tar.add(f, arcname=str(arcname)) - try: - os.remove(f) except Exception: pass + return [] logging.critical("Tarball created: %s", staged_tarball) @@ -260,32 +276,11 @@ def bulk_download(track_list: list, target: str): shutil.move(str(staged_tarball), str(final_tarball)) logging.critical("Tarball finalized: %s", final_tarball) - - # Cleanup empty dirs (unchanged) - to_check = set() - for p in all_final_files: - if p.parent: - to_check.add(p.parent) - if p.parent and p.parent.parent: - to_check.add(p.parent.parent) - for d in sorted(to_check, key=lambda p: len(p.parts), reverse=True): - if d.is_dir(): - try: - next(d.iterdir()) - except StopIteration: - shutil.rmtree(d, ignore_errors=True) - except Exception: - pass - - # Update job status → done if job: - try: - job.meta["tarball"] = str(final_tarball) - job.meta["progress"] = 100 - job.meta["status"] = "done" - job.save_meta() - except Exception as e: - logging.warning("Failed to write final status to job.meta: %s", e) + job.meta["tarball"] = str(final_tarball) + job.meta["progress"] = 100 + job.meta["status"] = "completed" + job.save_meta() return [str(final_tarball)] @@ -294,5 +289,10 @@ def bulk_download(track_list: list, target: str): asyncio.set_event_loop(loop) try: return loop.run_until_complete(process_tracks()) + except Exception as e: + if job: + job.meta["status"] = "failed" + job.save_meta() + logging.critical("Exception: %s", str(e)) finally: loop.close()