Files
api/utils/rip_background.py

359 lines
12 KiB
Python

import logging
import asyncio
import random
import os
import tarfile
import uuid
import subprocess
import shutil
import re
from pathlib import Path
from urllib.parse import urlparse, unquote
import aiohttp
from rq import get_current_job
from utils.sr_wrapper import SRUtil
# ---------- Config ----------
ROOT_DIR = Path("/storage/music2")
MAX_RETRIES = 3
THROTTLE_MIN = 0.3
THROTTLE_MAX = 1.0
HEADERS = {
"User-Agent": (
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
"AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/116.0.5845.97 Safari/537.36"
),
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8",
"Accept-Language": "en-US,en;q=0.9",
"Connection": "keep-alive",
}
logging.basicConfig(
level=logging.DEBUG,
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
)
sr = SRUtil()
# ---------- Helpers ----------
def cleanup_empty_dirs(root: Path):
"""
Recursively remove any directories under root that contain no files
(empty or only empty subdirectories).
"""
for dirpath, dirnames, filenames in os.walk(root, topdown=False):
p = Path(dirpath)
# Check if there are any files in this directory or subdirectories
has_file = any(f.is_file() for f in p.rglob("*"))
if not has_file:
try:
p.rmdir() # safe to remove empty dirs
except Exception:
pass
def sanitize_filename(name: str) -> str:
"""Make a string safe for file/dir names."""
if not name:
return "Unknown"
# Replace path separators first
name = name.replace("/", "-").replace("\\", "-")
# Remove illegal characters on common filesystems
name = re.sub(r'[<>:"|?*\x00-\x1F]', "", name)
# Trim spaces and trailing dots
name = name.strip().strip(".")
# Collapse whitespace
name = re.sub(r"\s+", " ", name)
# Reasonable length cap
return name[:180] or "Unknown"
def ensure_unique_path(p: Path) -> Path:
"""
Ensure the given file or directory path is unique.
If a conflict exists, append (2), (3), ... until it's unique.
"""
parent = p.parent
stem, suffix = p.stem, p.suffix
# If suffix is empty → directory case
if p.is_dir() or suffix == "":
candidate = parent / stem
counter = 2
while candidate.exists():
candidate = parent / f"{stem} ({counter})"
counter += 1
return candidate
# File case
candidate = parent / f"{stem}{suffix}"
counter = 2
while candidate.exists():
candidate = parent / f"{stem} ({counter}){suffix}"
counter += 1
return candidate
# ---------- Job ----------
def bulk_download(track_list: list, quality: str = "FLAC"):
"""
RQ job:
- fetches stream URLs
- downloads with retries + throttling
- uses SR metadata to name/organize files
- creates ONE tarball for all tracks
- returns [tarball_path]
"""
job = get_current_job()
# Initialize job meta in a JSON/pickle-safe way
if job:
try:
job.meta["track_ids"] = [str(t) for t in (track_list or [])]
job.meta["tracks"] = [] # will hold per-track dicts
job.meta["progress"] = 0
job.meta["tarball"] = None
job.meta["status"] = "Started"
job.save_meta()
except Exception as e:
logging.warning("Failed to init job.meta: %s", e)
async def process_tracks():
per_track_meta = [] # list of per-track dicts (JSON-safe)
all_final_files = [] # list[Path]
all_artists = set() # set[str]
(ROOT_DIR / "completed").mkdir(parents=True, exist_ok=True)
async with aiohttp.ClientSession(headers=HEADERS) as session:
total = len(track_list or [])
logging.critical("Total tracks to process: %s", total)
if job:
job.meta["progress"] = 0
job.save_meta()
for i, track_id in enumerate(track_list or []):
track_info = {
"track_id": str(track_id),
"status": "Pending", # Pending | Success | Failed
"file_path": None, # str | None
"error": None, # str | None
"attempts": 0, # int
}
attempt = 0
while attempt < MAX_RETRIES:
tmp_file = None
attempt += 1
track_info["attempts"] = attempt
try:
# 1) Stream URL
url = await sr.get_stream_url_by_track_id(track_id, quality)
if not url:
raise RuntimeError("No stream URL")
# 2) Extension from URL path only (no query)
parsed = urlparse(url)
clean_path = unquote(parsed.path)
ext = Path(clean_path).suffix or ".mp3"
# Unique temp file
tmp_file = Path(f"/tmp/{uuid.uuid4().hex}{ext}")
# 3) Download (chunked)
async with session.get(url) as resp:
resp.raise_for_status()
with open(tmp_file, "wb") as f:
async for chunk in resp.content.iter_chunked(64 * 1024):
f.write(chunk)
# 4) Metadata from SR (prefer API over tags)
md = await sr.get_metadata_by_track_id(track_id) or {}
artist_raw = md.get("artist") or "Unknown Artist"
album_raw = md.get("album") or "Unknown Album"
title_raw = md.get("song") or f"Track {track_id}"
artist = sanitize_filename(artist_raw)
album = sanitize_filename(album_raw)
title = sanitize_filename(title_raw)
all_artists.add(artist)
# 5) Final path
artist_dir = ROOT_DIR / artist
album_dir = artist_dir / album
album_dir.mkdir(parents=True, exist_ok=True)
# Only ensure uniqueness at the file level
final_file = ensure_unique_path(album_dir / f"{title}{ext}")
tmp_file.rename(final_file)
tmp_file = None # consumed
# Track success
track_info["status"] = "Success"
track_info["file_path"] = str(final_file)
track_info["error"] = None
all_final_files.append(final_file)
if job:
job.meta["progress"] = int(((i + 1) / total) * 100)
job.save_meta()
break # success; exit retry loop
except Exception as e:
logging.error(
"Track %s attempt %s failed: %s", track_id, attempt, e
)
track_info["error"] = str(e)
if attempt >= MAX_RETRIES:
track_info["status"] = "Failed"
# small backoff before next attempt (or next track)
await asyncio.sleep(random.uniform(THROTTLE_MIN, THROTTLE_MAX))
finally:
# Clean partial temp file on failure
try:
if tmp_file and tmp_file.exists():
tmp_file.unlink()
except Exception:
pass
# Update RQ meta after each track
per_track_meta.append(track_info)
if job:
try:
job.meta["tracks"] = per_track_meta
job.save_meta()
except Exception as e:
logging.warning(
"Failed to update job.meta after track %s: %s", track_id, e
)
# Throttle between tracks
await asyncio.sleep(random.uniform(THROTTLE_MIN, THROTTLE_MAX))
# ---- Single combined tarball for all tracks ----
if not all_final_files:
if job:
try:
job.meta["tarball"] = None
job.meta["status"] = "Failed"
job.save_meta()
except Exception:
pass
return []
# Pick artist with the most tracks
artist_counts: dict[str, int] = {}
for t in per_track_meta:
if t["status"] == "Success" and t.get("file_path"):
try:
artist = Path(t["file_path"]).relative_to(ROOT_DIR).parts[0]
except Exception:
artist = "Unknown Artist"
artist_counts[artist] = artist_counts.get(artist, 0) + 1
if artist_counts:
top_artist = sorted(artist_counts.items(), key=lambda kv: (-kv[1], kv[0]))[
0
][0]
else:
top_artist = "Unknown Artist"
combined_artist = sanitize_filename(top_artist)
short_id = uuid.uuid4().hex[:8]
# Stage tarball in ROOT_DIR first
staged_tarball = ROOT_DIR / f"{combined_artist}_{short_id}.tar.gz"
final_tarball = ROOT_DIR / "completed" / quality / staged_tarball.name
final_tarball.parent.mkdir(parents=True, exist_ok=True)
if job:
try:
job.meta["status"] = "Compressing"
job.save_meta()
except Exception:
pass
logging.info("Creating tarball: %s", staged_tarball)
# Run blocking tar creation in background thread
def _create_tar_sync():
try:
subprocess.run(
[
"tar", "-I", "pigz -9", "-cf", str(staged_tarball),
"-C", str(ROOT_DIR)
] + [str(f.relative_to(ROOT_DIR)) for f in all_final_files],
check=True
)
# cleanup files after successful tar
for f in all_final_files:
try:
os.remove(f)
except Exception:
pass
except FileNotFoundError:
# pigz or tar not available → fallback to Python tarfile
logging.warning("pigz not available, falling back to tarfile (slower).")
with tarfile.open(staged_tarball, "w:gz") as tar:
for f in all_final_files:
try:
arcname = f.relative_to(ROOT_DIR)
except ValueError:
arcname = f.name
tar.add(f, arcname=str(arcname))
try:
os.remove(f)
except Exception:
pass
await asyncio.to_thread(_create_tar_sync)
# sanity check
if not staged_tarball.exists():
logging.error("Tarball was not created: %s", staged_tarball)
if job:
try:
job.meta["status"] = "compress_failed"
job.save_meta()
except Exception:
pass
return []
logging.critical("Tarball created: %s", staged_tarball)
# Now move tarball into completed folder
try:
staged_tarball.rename(final_tarball)
except Exception:
shutil.move(str(staged_tarball), str(final_tarball))
logging.critical("Tarball finalized: %s", final_tarball)
await asyncio.to_thread(cleanup_empty_dirs, ROOT_DIR)
if job:
job.meta["tarball"] = str(final_tarball)
job.meta["progress"] = 100
job.meta["status"] = "Completed"
job.save_meta()
return [str(final_tarball)]
# Run async part synchronously for RQ
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
return loop.run_until_complete(process_tracks())
except Exception as e:
if job:
job.meta["status"] = "Failed"
job.save_meta()
logging.critical("Exception: %s", str(e))
finally:
loop.close()