From 81f79dea1ec0aa3c1c05afd1d7e69c25764bbf1e Mon Sep 17 00:00:00 2001 From: codey Date: Wed, 20 Aug 2025 07:32:57 -0400 Subject: [PATCH] misc --- endpoints/rip.py | 17 +++-- utils/radio_util.py | 3 +- utils/rip_background.py | 158 +++++++++++++++++++++++++++++----------- utils/sr_wrapper.py | 22 +++--- 4 files changed, 137 insertions(+), 63 deletions(-) diff --git a/endpoints/rip.py b/endpoints/rip.py index 7eae60b..a539085 100644 --- a/endpoints/rip.py +++ b/endpoints/rip.py @@ -14,6 +14,7 @@ from pydantic import BaseModel class ValidBulkFetchRequest(BaseModel): track_ids: list[int] + target: str class RIP(FastAPI): @@ -35,7 +36,7 @@ class RIP(FastAPI): self.task_queue = Queue( "dls", connection=self.redis_conn, - default_timeout=3600, + default_timeout=14400, default_result_ttl=86400, default_failure_ttl=86400, ) @@ -113,7 +114,7 @@ class RIP(FastAPI): user=Depends(get_current_user), ) -> Response: """Bulk fetch a list of track IDs""" - if not data or not data.track_ids: + if not data or not data.track_ids or not data.target: return JSONResponse( content={ "err": True, @@ -121,10 +122,11 @@ class RIP(FastAPI): } ) track_ids = data.track_ids + target = data.target job = self.task_queue.enqueue( bulk_download, - args=(track_ids,), - job_timeout=3600, + args=(track_ids, target), + job_timeout=14400, failure_ttl=86400, result_ttl=86400, ) @@ -152,6 +154,8 @@ class RIP(FastAPI): "enqueued_at": job.enqueued_at, "started_at": job.started_at, "ended_at": job.ended_at, + "progress": job.meta.get("progress"), + "target": job.meta.get("target"), } async def job_list_handler(self, request: Request, user=Depends(get_current_user)): @@ -167,6 +171,7 @@ class RIP(FastAPI): "result": job.result, "enqueued_at": job.enqueued_at, "progress": job.meta.get("progress", None), + "target": job.meta.get("target", None) } ) @@ -188,9 +193,11 @@ class RIP(FastAPI): "id": job.id, "status": status, "result": job.result, + "tarball": job.meta.get("tarball", None), "enqueued_at": job.enqueued_at, "progress": job.meta.get("progress", None), - "tracks": job.meta.get("track_list", None), + "tracks": job.meta.get("tracks", None), + "target": job.meta.get("target", None), } ) diff --git a/utils/radio_util.py b/utils/radio_util.py index 1465372..4b672c3 100644 --- a/utils/radio_util.py +++ b/utils/radio_util.py @@ -394,8 +394,7 @@ class RadioUtil: _playlist = await self.redis_client.json().get(playlist_redis_key) # type: ignore if playlist not in self.active_playlist.keys(): self.active_playlist[playlist] = [] - if not playlist == "rock": - random.shuffle(_playlist) # Temp/for Cocteau Twins + random.shuffle(_playlist) self.active_playlist[playlist] = [ { "uuid": str(uuid().hex), diff --git a/utils/rip_background.py b/utils/rip_background.py index f7e5e64..10613c0 100644 --- a/utils/rip_background.py +++ b/utils/rip_background.py @@ -16,8 +16,8 @@ from utils.sr_wrapper import SRUtil # ---------- Config ---------- ROOT_DIR = Path("/storage/music2") # change to your music folder MAX_RETRIES = 3 -THROTTLE_MIN = 0.2 -THROTTLE_MAX = 1.5 +THROTTLE_MIN = 0.3 +THROTTLE_MAX = 1.0 HEADERS = { "User-Agent": ( @@ -68,9 +68,8 @@ def ensure_unique_path(p: Path) -> Path: return candidate n += 1 - # ---------- Job ---------- -def bulk_download(track_list: list): +def bulk_download(track_list: list, target: str): """ RQ job: - fetches stream URLs @@ -81,29 +80,44 @@ def bulk_download(track_list: list): """ job = get_current_job() + # Initialize job meta in a JSON/pickle-safe way + if job: + try: + job.meta["track_ids"] = [str(t) for t in (track_list or [])] + job.meta["tracks"] = [] # will hold per-track dicts + job.meta["progress"] = 0 + job.meta["tarball"] = None + job.meta["target"] = target + job.save_meta() + except Exception as e: + logging.warning("Failed to init job.meta: %s", e) + async def process_tracks(): - per_track_meta = [] - all_final_files: list[Path] = [] - all_artists: set[str] = set() + per_track_meta = [] # list of per-track dicts (JSON-safe) + all_final_files = [] # list[Path] + all_artists = set() # set[str] (ROOT_DIR / "completed").mkdir(parents=True, exist_ok=True) async with aiohttp.ClientSession(headers=HEADERS) as session: - total = len(track_list) + total = len(track_list or []) logging.critical("Total tracks to process: %s", total) - for i, track_id in enumerate(track_list): + for i, track_id in enumerate(track_list or []): track_info = { - "track_id": track_id, - "status": "pending", - "file_path": None, - "error": None, + "track_id": str(track_id), + "status": "pending", # pending | success | failed + "file_path": None, # str | None + "error": None, # str | None + "attempts": 0, # int } attempt = 0 while attempt < MAX_RETRIES: - tmp_file: Path | None = None + tmp_file = None attempt += 1 + track_info["attempts"] = attempt + try: # 1) Stream URL url = await sr.get_stream_url_by_track_id(track_id) @@ -112,7 +126,7 @@ def bulk_download(track_list: list): # 2) Extension from URL path only (no query) parsed = urlparse(url) - clean_path = unquote(parsed.path) # path has no query; just in case we unquote + clean_path = unquote(parsed.path) ext = Path(clean_path).suffix or ".mp3" # Unique temp file @@ -128,8 +142,8 @@ def bulk_download(track_list: list): # 4) Metadata from SR (prefer API over tags) md = await sr.get_metadata_by_track_id(track_id) or {} artist_raw = md.get("artist") or "Unknown Artist" - album_raw = md.get("album") or "Unknown Album" - title_raw = md.get("song") or f"Track {track_id}" + album_raw = md.get("album") or "Unknown Album" + title_raw = md.get("song") or f"Track {track_id}" artist = sanitize_filename(artist_raw) album = sanitize_filename(album_raw) @@ -146,9 +160,9 @@ def bulk_download(track_list: list): tmp_file = None # consumed # Track success - track_info.update( - {"status": "success", "file_path": str(final_file)} - ) + track_info["status"] = "success" + track_info["file_path"] = str(final_file) + track_info["error"] = None all_final_files.append(final_file) break # success; exit retry loop @@ -161,63 +175,119 @@ def bulk_download(track_list: list): await asyncio.sleep(random.uniform(THROTTLE_MIN, THROTTLE_MAX)) finally: # Clean partial temp file on failure - if tmp_file and tmp_file.exists(): - try: + try: + if tmp_file and tmp_file.exists(): tmp_file.unlink() - except Exception: - pass + except Exception: + pass # Update RQ meta after each track per_track_meta.append(track_info) if job: - job.meta["progress"] = int((i + 1) / max(total, 1) * 100) - job.meta["tracks"] = track_list - job.save_meta() + try: + job.meta["tracks"] = per_track_meta + job.meta["progress"] = int(((i + 1) / max(total, 1)) * 100) + job.save_meta() + except Exception as e: + logging.warning("Failed to update job.meta after track %s: %s", track_id, e) # Throttle between tracks await asyncio.sleep(random.uniform(THROTTLE_MIN, THROTTLE_MAX)) - # ---- Single combined tarball for all tracks ---- + # ---- Single combined tarball for all tracks ---- if not all_final_files: - # nothing succeeded + if job: + try: + job.meta["tarball"] = None + job.meta["status"] = "failed" + job.save_meta() + except Exception: + pass return [] - combined_artists = sanitize_filename(" & ".join(sorted(all_artists))) or "Unknown Artist" - short_id = uuid.uuid4().hex[:8] - tarball_path = (ROOT_DIR / "completed" / f"{combined_artists}_{short_id}.tar.gz") - tarball_path.parent.mkdir(parents=True, exist_ok=True) + # Pick artist with the most tracks + artist_counts: dict[str, int] = {} + for t in per_track_meta: + if t["status"] == "success" and t.get("file_path"): + try: + artist = Path(t["file_path"]).relative_to(ROOT_DIR).parts[0] + except Exception: + artist = "Unknown Artist" + artist_counts[artist] = artist_counts.get(artist, 0) + 1 - with tarfile.open(tarball_path, "w:gz") as tar: + if artist_counts: + top_artist = sorted( + artist_counts.items(), key=lambda kv: (-kv[1], kv[0]) + )[0][0] + else: + top_artist = "Unknown Artist" + + combined_artist = sanitize_filename(top_artist) + short_id = uuid.uuid4().hex[:8] + + # Stage tarball in ROOT_DIR first + staged_tarball = ROOT_DIR / f"{combined_artist}_{short_id}.tar.gz" + final_tarball = ROOT_DIR / "completed" / staged_tarball.name + final_tarball.parent.mkdir(parents=True, exist_ok=True) + + + with tarfile.open(staged_tarball, "w:gz") as tar: + # Update job status → compressing + if job: + try: + job.meta["status"] = "compressing" + job.save_meta() + except Exception: + pass + logging.info("Creating tarball: %s", staged_tarball) for f in all_final_files: - # Preserve relative Artist/Album/Song.ext structure inside the tar try: arcname = f.relative_to(ROOT_DIR) except ValueError: - arcname = f.name # fallback + arcname = f.name tar.add(f, arcname=str(arcname)) - # remove original file after adding try: os.remove(f) except Exception: pass - logging.critical("Created tarball: %s", tarball_path) + logging.critical("Tarball created: %s", staged_tarball) - # Cleanup empty artist/album dirs (best-effort) - # Remove any directories under ROOT_DIR that are now empty - to_check = {p.parent for p in all_final_files} | {p.parent.parent for p in all_final_files} + # Now move tarball into completed folder + try: + staged_tarball.rename(final_tarball) + except Exception: + shutil.move(str(staged_tarball), str(final_tarball)) + + logging.critical("Tarball finalized: %s", final_tarball) + + # Cleanup empty dirs (unchanged) + to_check = set() + for p in all_final_files: + if p.parent: + to_check.add(p.parent) + if p.parent and p.parent.parent: + to_check.add(p.parent.parent) for d in sorted(to_check, key=lambda p: len(p.parts), reverse=True): if d.is_dir(): try: - # remove only if empty next(d.iterdir()) except StopIteration: - # empty shutil.rmtree(d, ignore_errors=True) except Exception: pass - return [str(tarball_path)] + # Update job status → done + if job: + try: + job.meta["tarball"] = str(final_tarball) + job.meta["progress"] = 100 + job.meta["status"] = "done" + job.save_meta() + except Exception as e: + logging.warning("Failed to write final status to job.meta: %s", e) + + return [str(final_tarball)] # Run async part synchronously for RQ loop = asyncio.new_event_loop() diff --git a/utils/sr_wrapper.py b/utils/sr_wrapper.py index fe4bb18..25baa6e 100644 --- a/utils/sr_wrapper.py +++ b/utils/sr_wrapper.py @@ -44,7 +44,6 @@ class SRUtil: ) self.streamrip_config self.streamrip_client = TidalClient(self.streamrip_config) - asyncio.get_event_loop().create_task(self.streamrip_client.login()) def dedupe_by_key(self, key: str, entries: list[dict]) -> list[dict]: deduped = {} @@ -68,8 +67,11 @@ class SRUtil: Optional[dict]: The artist details or None if not found. """ - if not self.streamrip_client.logged_in: + try: await self.streamrip_client.login() + except Exception as e: + logging.info("Login Exception: %s", str(e)) + pass artists_out: list[dict] = [] try: artists = await self.streamrip_client.search( @@ -80,6 +82,7 @@ class SRUtil: artists = await self.streamrip_client.search( media_type="artist", query=artist_name ) + logging.critical("Artists output: %s", artists) artists = artists[0].get("items", []) if not artists: logging.warning("No artist found for name: %s", artist_name) @@ -105,8 +108,7 @@ class SRUtil: artist_id_str: str = str(artist_id) albums_out: list[dict] = [] try: - if not self.streamrip_client.logged_in: - await self.streamrip_client.login() + await self.streamrip_client.login() metadata = await self.streamrip_client.get_metadata( item_id=artist_id_str, media_type="artist" ) @@ -141,8 +143,7 @@ class SRUtil: Optional[list[dict]]: List of tracks or None if not found. """ album_id_str = str(album_id) - if not self.streamrip_client.logged_in: - await self.streamrip_client.login() + await self.streamrip_client.login() metadata = await self.streamrip_client.get_metadata( item_id=album_id_str, media_type="album" ) @@ -195,8 +196,7 @@ class SRUtil: quality_int = 0 track_id_str: str = str(track_id) - if not self.streamrip_client.logged_in: - await self.streamrip_client.login() + await self.streamrip_client.login() try: track = await self.streamrip_client.get_downloadable( @@ -217,9 +217,8 @@ class SRUtil: return stream_url async def get_metadata_by_track_id(self, track_id: int) -> Optional[dict]: - if not self.streamrip_client.logged_in: - await self.streamrip_client.login() try: + await self.streamrip_client.login() metadata = await self.streamrip_client.get_metadata(str(track_id), "track") return { "artist": metadata.get("artist", {}).get("name", "Unknown Artist"), @@ -240,9 +239,8 @@ class SRUtil: Returns: bool """ - if not self.streamrip_client.logged_in: - await self.streamrip_client.login() try: + await self.streamrip_client.login() track_url = await self.get_stream_url_by_track_id(track_id) if not track_url: return False