import logging import asyncio import random import os import tarfile import uuid import shutil import re from pathlib import Path from urllib.parse import urlparse, unquote import aiohttp from rq import get_current_job from utils.sr_wrapper import SRUtil # ---------- Config ---------- ROOT_DIR = Path("/storage/music2") # change to your music folder MAX_RETRIES = 3 THROTTLE_MIN = 0.2 THROTTLE_MAX = 1.5 HEADERS = { "User-Agent": ( "Mozilla/5.0 (Windows NT 10.0; Win64; x64) " "AppleWebKit/537.36 (KHTML, like Gecko) " "Chrome/116.0.5845.97 Safari/537.36" ), "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8", "Accept-Language": "en-US,en;q=0.9", "Connection": "keep-alive", } logging.basicConfig( level=logging.DEBUG, format="%(asctime)s [%(levelname)s] %(name)s: %(message)s", ) sr = SRUtil() # ---------- Helpers ---------- def sanitize_filename(name: str) -> str: """Make a string safe for file/dir names.""" if not name: return "Unknown" # Replace path separators first name = name.replace("/", "-").replace("\\", "-") # Remove illegal characters on common filesystems name = re.sub(r'[<>:"|?*\x00-\x1F]', "", name) # Trim spaces and trailing dots name = name.strip().strip(".") # Collapse whitespace name = re.sub(r"\s+", " ", name) # Reasonable length cap return name[:180] or "Unknown" def ensure_unique_path(p: Path) -> Path: """If path exists, append ' (n)' before extension.""" if not p.exists(): return p stem, suffix = p.stem, p.suffix parent = p.parent n = 2 while True: candidate = parent / f"{stem} ({n}){suffix}" if not candidate.exists(): return candidate n += 1 # ---------- Job ---------- def bulk_download(track_list: list): """ RQ job: - fetches stream URLs - downloads with retries + throttling - uses SR metadata to name/organize files - creates ONE tarball for all tracks, with all artist names in the filename - returns [tarball_path] """ job = get_current_job() async def process_tracks(): per_track_meta = [] all_final_files: list[Path] = [] all_artists: set[str] = set() (ROOT_DIR / "completed").mkdir(parents=True, exist_ok=True) async with aiohttp.ClientSession(headers=HEADERS) as session: total = len(track_list) logging.critical("Total tracks to process: %s", total) for i, track_id in enumerate(track_list): track_info = { "track_id": track_id, "status": "pending", "file_path": None, "error": None, } attempt = 0 while attempt < MAX_RETRIES: tmp_file: Path | None = None attempt += 1 try: # 1) Stream URL url = await sr.get_stream_url_by_track_id(track_id) if not url: raise RuntimeError("No stream URL") # 2) Extension from URL path only (no query) parsed = urlparse(url) clean_path = unquote(parsed.path) # path has no query; just in case we unquote ext = Path(clean_path).suffix or ".mp3" # Unique temp file tmp_file = Path(f"/tmp/{uuid.uuid4().hex}{ext}") # 3) Download (chunked) async with session.get(url) as resp: resp.raise_for_status() with open(tmp_file, "wb") as f: async for chunk in resp.content.iter_chunked(64 * 1024): f.write(chunk) # 4) Metadata from SR (prefer API over tags) md = await sr.get_metadata_by_track_id(track_id) or {} artist_raw = md.get("artist") or "Unknown Artist" album_raw = md.get("album") or "Unknown Album" title_raw = md.get("song") or f"Track {track_id}" artist = sanitize_filename(artist_raw) album = sanitize_filename(album_raw) title = sanitize_filename(title_raw) all_artists.add(artist) # 5) Final path final_dir = ROOT_DIR / artist / album final_dir.mkdir(parents=True, exist_ok=True) final_file = ensure_unique_path(final_dir / f"{title}{ext}") tmp_file.rename(final_file) tmp_file = None # consumed # Track success track_info.update( {"status": "success", "file_path": str(final_file)} ) all_final_files.append(final_file) break # success; exit retry loop except Exception as e: logging.error("Track %s attempt %s failed: %s", track_id, attempt, e) track_info["error"] = str(e) if attempt >= MAX_RETRIES: track_info["status"] = "failed" # small backoff before next attempt (or next track) await asyncio.sleep(random.uniform(THROTTLE_MIN, THROTTLE_MAX)) finally: # Clean partial temp file on failure if tmp_file and tmp_file.exists(): try: tmp_file.unlink() except Exception: pass # Update RQ meta after each track per_track_meta.append(track_info) if job: job.meta["progress"] = int((i + 1) / max(total, 1) * 100) job.meta["tracks"] = track_list job.save_meta() # Throttle between tracks await asyncio.sleep(random.uniform(THROTTLE_MIN, THROTTLE_MAX)) # ---- Single combined tarball for all tracks ---- if not all_final_files: # nothing succeeded return [] combined_artists = sanitize_filename(" & ".join(sorted(all_artists))) or "Unknown Artist" short_id = uuid.uuid4().hex[:8] tarball_path = (ROOT_DIR / "completed" / f"{combined_artists}_{short_id}.tar.gz") tarball_path.parent.mkdir(parents=True, exist_ok=True) with tarfile.open(tarball_path, "w:gz") as tar: for f in all_final_files: # Preserve relative Artist/Album/Song.ext structure inside the tar try: arcname = f.relative_to(ROOT_DIR) except ValueError: arcname = f.name # fallback tar.add(f, arcname=str(arcname)) # remove original file after adding try: os.remove(f) except Exception: pass logging.critical("Created tarball: %s", tarball_path) # Cleanup empty artist/album dirs (best-effort) # Remove any directories under ROOT_DIR that are now empty to_check = {p.parent for p in all_final_files} | {p.parent.parent for p in all_final_files} for d in sorted(to_check, key=lambda p: len(p.parts), reverse=True): if d.is_dir(): try: # remove only if empty next(d.iterdir()) except StopIteration: # empty shutil.rmtree(d, ignore_errors=True) except Exception: pass return [str(tarball_path)] # Run async part synchronously for RQ loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) try: return loop.run_until_complete(process_tracks()) finally: loop.close()