import logging import asyncio import random import os import tarfile import uuid import shutil import re from pathlib import Path from urllib.parse import urlparse, unquote import aiohttp from rq import get_current_job from utils.sr_wrapper import SRUtil # ---------- Config ---------- ROOT_DIR = Path("/storage/music2") # change to your music folder MAX_RETRIES = 3 THROTTLE_MIN = 0.3 THROTTLE_MAX = 1.0 HEADERS = { "User-Agent": ( "Mozilla/5.0 (Windows NT 10.0; Win64; x64) " "AppleWebKit/537.36 (KHTML, like Gecko) " "Chrome/116.0.5845.97 Safari/537.36" ), "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8", "Accept-Language": "en-US,en;q=0.9", "Connection": "keep-alive", } logging.basicConfig( level=logging.DEBUG, format="%(asctime)s [%(levelname)s] %(name)s: %(message)s", ) sr = SRUtil() # ---------- Helpers ---------- def sanitize_filename(name: str) -> str: """Make a string safe for file/dir names.""" if not name: return "Unknown" # Replace path separators first name = name.replace("/", "-").replace("\\", "-") # Remove illegal characters on common filesystems name = re.sub(r'[<>:"|?*\x00-\x1F]', "", name) # Trim spaces and trailing dots name = name.strip().strip(".") # Collapse whitespace name = re.sub(r"\s+", " ", name) # Reasonable length cap return name[:180] or "Unknown" def ensure_unique_path(p: Path) -> Path: """If path exists, append ' (n)' before extension.""" if not p.exists(): return p stem, suffix = p.stem, p.suffix parent = p.parent n = 2 while True: candidate = parent / f"{stem} ({n}){suffix}" if not candidate.exists(): return candidate n += 1 # ---------- Job ---------- def bulk_download(track_list: list, target: str): """ RQ job: - fetches stream URLs - downloads with retries + throttling - uses SR metadata to name/organize files - creates ONE tarball for all tracks, with all artist names in the filename - returns [tarball_path] """ job = get_current_job() # Initialize job meta in a JSON/pickle-safe way if job: try: job.meta["track_ids"] = [str(t) for t in (track_list or [])] job.meta["tracks"] = [] # will hold per-track dicts job.meta["progress"] = 0 job.meta["tarball"] = None job.meta["target"] = target job.save_meta() except Exception as e: logging.warning("Failed to init job.meta: %s", e) async def process_tracks(): per_track_meta = [] # list of per-track dicts (JSON-safe) all_final_files = [] # list[Path] all_artists = set() # set[str] (ROOT_DIR / "completed").mkdir(parents=True, exist_ok=True) async with aiohttp.ClientSession(headers=HEADERS) as session: total = len(track_list or []) logging.critical("Total tracks to process: %s", total) for i, track_id in enumerate(track_list or []): track_info = { "track_id": str(track_id), "status": "pending", # pending | success | failed "file_path": None, # str | None "error": None, # str | None "attempts": 0, # int } attempt = 0 while attempt < MAX_RETRIES: tmp_file = None attempt += 1 track_info["attempts"] = attempt try: # 1) Stream URL url = await sr.get_stream_url_by_track_id(track_id) if not url: raise RuntimeError("No stream URL") # 2) Extension from URL path only (no query) parsed = urlparse(url) clean_path = unquote(parsed.path) ext = Path(clean_path).suffix or ".mp3" # Unique temp file tmp_file = Path(f"/tmp/{uuid.uuid4().hex}{ext}") # 3) Download (chunked) async with session.get(url) as resp: resp.raise_for_status() with open(tmp_file, "wb") as f: async for chunk in resp.content.iter_chunked(64 * 1024): f.write(chunk) # 4) Metadata from SR (prefer API over tags) md = await sr.get_metadata_by_track_id(track_id) or {} artist_raw = md.get("artist") or "Unknown Artist" album_raw = md.get("album") or "Unknown Album" title_raw = md.get("song") or f"Track {track_id}" artist = sanitize_filename(artist_raw) album = sanitize_filename(album_raw) title = sanitize_filename(title_raw) all_artists.add(artist) # 5) Final path final_dir = ROOT_DIR / artist / album final_dir.mkdir(parents=True, exist_ok=True) final_file = ensure_unique_path(final_dir / f"{title}{ext}") tmp_file.rename(final_file) tmp_file = None # consumed # Track success track_info["status"] = "success" track_info["file_path"] = str(final_file) track_info["error"] = None all_final_files.append(final_file) break # success; exit retry loop except Exception as e: logging.error("Track %s attempt %s failed: %s", track_id, attempt, e) track_info["error"] = str(e) if attempt >= MAX_RETRIES: track_info["status"] = "failed" # small backoff before next attempt (or next track) await asyncio.sleep(random.uniform(THROTTLE_MIN, THROTTLE_MAX)) finally: # Clean partial temp file on failure try: if tmp_file and tmp_file.exists(): tmp_file.unlink() except Exception: pass # Update RQ meta after each track per_track_meta.append(track_info) if job: try: job.meta["tracks"] = per_track_meta job.meta["progress"] = int(((i + 1) / max(total, 1)) * 100) job.save_meta() except Exception as e: logging.warning("Failed to update job.meta after track %s: %s", track_id, e) # Throttle between tracks await asyncio.sleep(random.uniform(THROTTLE_MIN, THROTTLE_MAX)) # ---- Single combined tarball for all tracks ---- if not all_final_files: if job: try: job.meta["tarball"] = None job.meta["status"] = "failed" job.save_meta() except Exception: pass return [] # Pick artist with the most tracks artist_counts: dict[str, int] = {} for t in per_track_meta: if t["status"] == "success" and t.get("file_path"): try: artist = Path(t["file_path"]).relative_to(ROOT_DIR).parts[0] except Exception: artist = "Unknown Artist" artist_counts[artist] = artist_counts.get(artist, 0) + 1 if artist_counts: top_artist = sorted( artist_counts.items(), key=lambda kv: (-kv[1], kv[0]) )[0][0] else: top_artist = "Unknown Artist" combined_artist = sanitize_filename(top_artist) short_id = uuid.uuid4().hex[:8] # Stage tarball in ROOT_DIR first staged_tarball = ROOT_DIR / f"{combined_artist}_{short_id}.tar.gz" final_tarball = ROOT_DIR / "completed" / staged_tarball.name final_tarball.parent.mkdir(parents=True, exist_ok=True) with tarfile.open(staged_tarball, "w:gz") as tar: # Update job status → compressing if job: try: job.meta["status"] = "compressing" job.save_meta() except Exception: pass logging.info("Creating tarball: %s", staged_tarball) for f in all_final_files: try: arcname = f.relative_to(ROOT_DIR) except ValueError: arcname = f.name tar.add(f, arcname=str(arcname)) try: os.remove(f) except Exception: pass logging.critical("Tarball created: %s", staged_tarball) # Now move tarball into completed folder try: staged_tarball.rename(final_tarball) except Exception: shutil.move(str(staged_tarball), str(final_tarball)) logging.critical("Tarball finalized: %s", final_tarball) # Cleanup empty dirs (unchanged) to_check = set() for p in all_final_files: if p.parent: to_check.add(p.parent) if p.parent and p.parent.parent: to_check.add(p.parent.parent) for d in sorted(to_check, key=lambda p: len(p.parts), reverse=True): if d.is_dir(): try: next(d.iterdir()) except StopIteration: shutil.rmtree(d, ignore_errors=True) except Exception: pass # Update job status → done if job: try: job.meta["tarball"] = str(final_tarball) job.meta["progress"] = 100 job.meta["status"] = "done" job.save_meta() except Exception as e: logging.warning("Failed to write final status to job.meta: %s", e) return [str(final_tarball)] # Run async part synchronously for RQ loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) try: return loop.run_until_complete(process_tracks()) finally: loop.close()