Files
api/utils/rip_background.py
2026-01-25 13:14:00 -05:00

896 lines
36 KiB
Python

import logging
import asyncio
import random
import os
import tarfile
import traceback
import uuid
import subprocess
import shutil
from pathlib import Path
from typing import Optional
import aiohttp
from datetime import datetime, timezone
from mediafile import MediaFile, Image, ImageType # type: ignore[import]
from rq import get_current_job
from utils.sr_wrapper import SRUtil, MetadataFetchError
from dotenv import load_dotenv
import re
# ---------- Config ----------
ROOT_DIR = Path("/storage/music2")
MAX_RETRIES = 4
THROTTLE_MIN = 0.0
THROTTLE_MAX = 0.0
DISCORD_WEBHOOK = os.getenv("TRIP_WEBHOOK_URI", "").strip()
HEADERS = {
"User-Agent": (
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
"AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/116.0.5845.97 Safari/537.36"
),
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8",
"Accept-Language": "en-US,en;q=0.9",
"Connection": "keep-alive",
}
# Logging is configured in base.py - don't override here
load_dotenv()
sr = SRUtil()
logger = logging.getLogger(__name__)
async def check_flac_stream(file_path):
"""Check if the given file contains a FLAC stream using ffprobe."""
cmd = [
"ffprobe",
"-v",
"error",
"-select_streams",
"a:0",
"-show_entries",
"stream=codec_name",
"-of",
"default=noprint_wrappers=1:nokey=1",
file_path,
]
process = await asyncio.create_subprocess_exec(
*cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE
)
stdout, _ = await process.communicate()
return b"flac" in stdout
# ---------- Discord helper ----------
async def discord_notify(
webhook_url: str,
title: str,
description: str,
target: Optional[str] = None,
color: int = 0x00FF00,
):
embed = {
"title": title,
"description": description[:1900] if description else "",
"color": color,
"timestamp": datetime.now(timezone.utc).isoformat(),
}
if target:
embed["fields"] = [{"name": "Target", "value": str(target), "inline": True}]
payload = {
"embeds": [embed],
}
while True: # permanent retry
try:
async with aiohttp.ClientSession() as session:
async with session.post(
webhook_url, json=payload, timeout=aiohttp.ClientTimeout(total=10)
) as resp:
if resp.status >= 400:
text = await resp.text()
raise RuntimeError(
f"Discord webhook failed ({resp.status}): {text}"
)
break
except Exception as e:
print(f"Discord send failed, retrying: {e}")
await asyncio.sleep(5)
def send_log_to_discord(message: str, level: str, target: Optional[str] = None):
colors = {"WARNING": 0xFFA500, "ERROR": 0xFF0000, "CRITICAL": 0xFF0000}
color = colors.get(level.upper(), 0xFFFF00)
async def _send():
await discord_notify(
webhook_url=DISCORD_WEBHOOK,
title=f"{level} in bulk_download",
description=message,
target=target,
color=color,
)
try:
asyncio.get_running_loop()
# already in an event loop — schedule a task
asyncio.create_task(_send())
except RuntimeError:
# not in an event loop — safe to run
asyncio.run(_send())
# ---------- Helpers ----------
def tag_with_mediafile(file_path: str, meta: dict):
f = MediaFile(file_path)
def safe_set(attr, value, default=None, cast=None):
if value is None:
value = default
if value is not None:
if cast:
setattr(f, attr, cast(value))
else:
setattr(f, attr, str(value))
safe_set("title", meta.get("title"), default="Unknown Title")
safe_set("artist", meta.get("artist"), default="Unknown Artist")
safe_set("albumartist", meta.get("album_artist"), default="Unknown Artist")
safe_set("album", meta.get("album"), default="Unknown Album")
safe_set("track", meta.get("track_number"), default=0, cast=int)
safe_set("disc", meta.get("disc_number"), default=0, cast=int)
safe_set("isrc", meta.get("isrc"), default="")
safe_set("bpm", meta.get("bpm"), default=0, cast=int)
release_date_str = meta.get("release_date")
release_date_obj = None
if release_date_str:
try:
release_date_obj = datetime.fromisoformat(release_date_str).date()
except ValueError:
try:
release_date_obj = datetime(int(release_date_str[:4]), 1, 1).date()
except Exception:
pass
if release_date_obj:
f.date = release_date_obj
# Attach album art if provided in meta (synchronous fallback)
try:
cover_bytes = meta.get("cover_bytes")
cover_url = None
if not cover_bytes:
cover_url = meta.get("cover_art_url") or meta.get("cover_url")
if not cover_bytes and cover_url:
try:
import requests
resp = requests.get(cover_url, timeout=10)
resp.raise_for_status()
cover_bytes = resp.content
except Exception:
cover_bytes = None
if cover_bytes:
try:
img = Image(cover_bytes, desc=None, type=ImageType.front)
f.images = [img]
except Exception:
pass
except Exception:
pass
f.save()
def sanitize_filename(name: str) -> str:
if not name:
return "Unknown"
name = name.replace("/", "-").replace("\\", "-")
name = re.sub(r'[<>:"|?*\x00-\x1F]', "", name)
name = name.strip().strip(".")
name = re.sub(r"\s+", " ", name)
return name[:180] or "Unknown"
def ensure_unique_path(p: Path) -> Path:
parent = p.parent
stem, suffix = p.stem, p.suffix
existing = {f.name for f in parent.glob(f"*{suffix}") if f.is_file()}
candidate = f"{stem}{suffix}"
if candidate not in existing:
return parent / candidate
counter = 2
while True:
candidate = f"{stem} ({counter}){suffix}"
if candidate not in existing:
return parent / candidate
counter += 1
def ensure_unique_filename_in_dir(parent: Path, filename: str) -> Path:
"""Return a Path in `parent` with a unique filename.
Handles multi-part extensions like `.tar.gz` so names become
`Name (2).tar.gz` instead of `Name.tar (2).tar.gz`.
"""
parent.mkdir(parents=True, exist_ok=True)
# special-case .tar.gz
if filename.lower().endswith(".tar.gz"):
ext = ".tar.gz"
base = filename[: -len(ext)]
else:
p = Path(filename)
ext = p.suffix
base = p.stem
existing = {f.name for f in parent.iterdir() if f.is_file()}
candidate = f"{base}{ext}"
if candidate not in existing:
return parent / candidate
counter = 2
while True:
candidate = f"{base} ({counter}){ext}"
if candidate not in existing:
return parent / candidate
counter += 1
# ---------- bulk_download ----------
def bulk_download(track_list: list, quality: str = "FLAC"):
"""
RQ job:
- fetches stream URLs
- downloads with retries + throttling
- uses SR metadata to name/organize files
- creates ONE tarball for all tracks
- returns [tarball_path]
- sends relevant messages to Discord
"""
job = get_current_job()
job_id = job.id if job else uuid.uuid4().hex
target = job.meta.get("target") if job else None
staging_root = ROOT_DIR / job_id
if job:
try:
job.meta["track_ids"] = [str(t) for t in (track_list or [])]
job.meta["tracks"] = []
job.meta["progress"] = 0
job.meta["tarball"] = None
job.meta["status"] = "Started"
job.save_meta()
except Exception as e:
send_log_to_discord(f"Failed to init job.meta: {e}", "WARNING", target)
# Job started Discord message
asyncio.run(
discord_notify(
DISCORD_WEBHOOK,
title=f"Job Started: {job_id}",
description=f"Processing `{len(track_list)}` track(s)",
target=target,
color=0x00FFFF,
)
)
async def process_tracks(track_list):
per_track_meta = []
all_final_files = []
all_artists = set()
(ROOT_DIR / "completed").mkdir(parents=True, exist_ok=True)
session = aiohttp.ClientSession(headers=HEADERS)
try:
print(f"DEBUG: Starting process_tracks with {len(track_list)} tracks")
# Set up a one-time rate-limit callback to notify on the first 429 seen by SRUtil
async def _rate_limit_notify(exc: Exception):
try:
send_log_to_discord(
f"Rate limit observed while fetching metadata: {exc}",
"WARNING",
target,
)
except Exception:
pass
# attach callback and reset notified flag for this job run
try:
sr.on_rate_limit = _rate_limit_notify
sr._rate_limit_notified = False
except Exception:
pass
total = len(track_list or [])
for i, track_id in enumerate(track_list or []):
print(f"DEBUG: Processing track {i + 1}/{total}: {track_id}")
track_info = {
"track_id": str(track_id),
"title": None,
"artist": None,
"status": "Pending",
"file_path": None,
"filename": None,
"error": None,
"attempts": 0,
}
attempt = 0
# Fetch metadata FIRST to check if track is available before attempting download
md = None
try:
print(f"DEBUG: Fetching metadata for track {track_id}")
md = await sr.get_metadata_by_track_id(track_id) or {}
print(f"DEBUG: Metadata fetched: {bool(md)}")
# Check if track is streamable
if md and not md.get("streamable", True):
print(f"TRACK {track_id}: Not streamable, skipping")
track_info["status"] = "Failed"
track_info["error"] = "Track not streamable"
track_info["title"] = md.get("title") or f"Track {track_id}"
track_info["artist"] = md.get("artist") or "Unknown Artist"
per_track_meta.append(track_info)
if job:
job.meta["tracks"] = per_track_meta
job.meta["progress"] = int(((i + 1) / total) * 100)
job.save_meta()
continue # Skip to next track
except MetadataFetchError as me:
# Permanent metadata failure — mark failed and skip
print(f"TRACK {track_id}: Metadata fetch failed permanently: {me}")
track_info["status"] = "Failed"
track_info["error"] = str(me)
track_info["title"] = f"Track {track_id}"
track_info["artist"] = "Unknown Artist"
per_track_meta.append(track_info)
if job:
job.meta["tracks"] = per_track_meta
job.meta["progress"] = int(((i + 1) / total) * 100)
job.save_meta()
continue # Skip to next track
except Exception as meta_err:
# Non-permanent error - will retry during download attempts
print(
f"TRACK {track_id}: Metadata prefetch failed (will retry): {meta_err}"
)
md = None
while attempt < MAX_RETRIES:
tmp_file = None
attempt += 1
track_info["attempts"] = attempt
try:
print(f"DEBUG: Getting downloadable for track {track_id}")
# Fetch downloadable (handles DASH and others)
downloadable = await sr._safe_api_call(
sr.streamrip_client.get_downloadable,
str(track_id),
2 if quality == "FLAC" else 1,
retries=3,
)
print(f"DEBUG: Got downloadable: {type(downloadable)}")
if not downloadable:
raise RuntimeError("No downloadable created")
ext = f".{downloadable.extension}"
tmp_file = Path(f"/tmp/{uuid.uuid4().hex}{ext}")
print(f"DEBUG: Starting download to {tmp_file}")
# Download
print(f"TRACK {track_id}: Starting download")
try:
await downloadable._download(
str(tmp_file), callback=lambda x=None: None
)
print(
f"TRACK {track_id}: Download method completed normally"
)
except Exception as download_e:
print(
f"TRACK {track_id}: Download threw exception: {download_e}"
)
raise
print(
f"DEBUG: Download completed, file exists: {tmp_file.exists()}"
)
if not tmp_file.exists():
raise RuntimeError(
f"Download completed but no file created: {tmp_file}"
)
# If we didn't get metadata earlier, try again now
if not md:
print(f"DEBUG: Re-fetching metadata for track {track_id}")
try:
md = await sr.get_metadata_by_track_id(track_id) or {}
except Exception:
md = {}
artist_raw = md.get("artist") or "Unknown Artist"
album_raw = md.get("album") or "Unknown Album"
title_raw = md.get("title") or f"Track {track_id}"
artist = sanitize_filename(artist_raw)
album = sanitize_filename(album_raw)
title = sanitize_filename(title_raw)
# Populate track_info fields so job meta contains the user-visible data
track_info["title"] = title
track_info["artist"] = artist
print(f"TRACK {track_id}: Processing '{title}' by {artist}")
all_artists.add(artist)
album_dir = staging_root / artist / album
album_dir.mkdir(parents=True, exist_ok=True)
final_file = ensure_unique_path(album_dir / f"{title}{ext}")
# Move to final location
print(f"TRACK {track_id}: Moving to final location...")
shutil.move(str(tmp_file), str(final_file))
print(f"TRACK {track_id}: File moved successfully")
# Fetch cover art
try:
album_field = md.get("album")
album_id = md.get("album_id") or (
album_field.get("id")
if isinstance(album_field, dict)
else None
)
except Exception:
album_id = None
if album_id:
try:
cover_url = await sr.get_cover_by_album_id(
album_id, size=640
)
except Exception:
cover_url = None
else:
cover_url = md.get("cover_url")
# Embed tags
embedded = False
img_bytes = None
if cover_url:
try:
timeout = aiohttp.ClientTimeout(total=15)
async with session.get(
cover_url, timeout=timeout
) as img_resp:
if img_resp.status == 200:
img_bytes = await img_resp.read()
else:
img_bytes = None
try:
send_log_to_discord(
f"Cover download HTTP `{img_resp.status}` for track `{track_id} album_id={album_id} url={cover_url} artist={artist} album={album}`",
"WARNING",
target,
)
except Exception:
pass
except Exception as e:
img_bytes = None
try:
send_log_to_discord(
f"Cover download exception for track `{track_id} album_id={album_id} url={cover_url} artist={artist} album={album}`: `{e}`",
"WARNING",
target,
)
except Exception:
pass
# Try music_tag first
try:
from music_tag import load_file as mt_load_file # type: ignore
# Add validation for `mf` object
try:
mf = mt_load_file(str(final_file))
if mf is not None:
if md.get("title"):
mf["title"] = md.get("title")
if md.get("artist"):
mf["artist"] = md.get("artist")
if md.get("album"):
mf["album"] = md.get("album")
tracknum = md.get("track_number")
if tracknum is not None:
try:
mf["tracknumber"] = int(tracknum)
except Exception:
pass
if img_bytes:
mf["artwork"] = img_bytes
mf.save()
embedded = True
else:
logger.error("Failed to load file with music_tag.")
embedded = False
except Exception:
embedded = False
except Exception:
embedded = False
if not embedded:
try:
if cover_url and not img_bytes:
send_log_to_discord(
f"Cover art not available for track {track_id} album_id={album_id} url={cover_url}",
"WARNING",
target,
)
except Exception:
pass
try:
tag_with_mediafile(str(final_file), md)
except Exception:
pass
# Success
tmp_file = None
track_info["status"] = "Success"
track_info["file_path"] = str(final_file)
try:
track_info["filename"] = final_file.name
except Exception:
track_info["filename"] = None
track_info["error"] = None
all_final_files.append(final_file)
print(
f"TRACK {track_id}: SUCCESS! Progress: {((i + 1) / total) * 100:.0f}%"
)
# Throttle after successful download to avoid hitting server too quickly
await asyncio.sleep(random.uniform(THROTTLE_MIN, THROTTLE_MAX))
if job:
job.meta["progress"] = int(((i + 1) / total) * 100)
job.meta["tracks"] = per_track_meta + [track_info]
job.save_meta()
break
except aiohttp.ClientResponseError as e:
msg = f"Track {track_id} attempt {attempt} ClientResponseError: {e}"
send_log_to_discord(msg, "WARNING", target)
# If 429, backoff as before. If 5xx, recreate session and refresh Tidal client.
if getattr(e, "status", None) == 429:
wait_time = min(60, 2**attempt)
await asyncio.sleep(wait_time)
elif 500 <= getattr(e, "status", 0) < 600:
# Recreate local aiohttp session on 5xx errors
try:
await session.close()
except Exception:
pass
session = aiohttp.ClientSession(headers=HEADERS)
# Also force a fresh Tidal login in case the upstream session is stale
try:
await sr._force_fresh_login()
send_log_to_discord(
f"Refreshed Tidal session after 5xx error on track {track_id}",
"WARNING",
target,
)
except Exception as login_err:
send_log_to_discord(
f"Failed to refresh Tidal session: {login_err}",
"ERROR",
target,
)
await asyncio.sleep(
random.uniform(THROTTLE_MIN, THROTTLE_MAX)
)
else:
await asyncio.sleep(
random.uniform(THROTTLE_MIN, THROTTLE_MAX)
)
except Exception as e:
tb = traceback.format_exc()
err_str = str(e).lower()
is_no_stream_url = (
isinstance(e, RuntimeError) and str(e) == "No stream URL"
)
# Check if this is a 5xx error from the server (may appear in error message)
is_5xx_error = any(
code in err_str for code in ("500", "502", "503", "504")
)
# Check for permanent failures that should NOT be retried
is_not_found = any(
phrase in err_str
for phrase in (
"track not found",
"not found",
"404",
"does not exist",
"no longer available",
"asset is not ready",
)
)
if is_not_found:
# Permanent failure - do not retry
msg = (
f"Track {track_id} not found/unavailable, skipping: {e}"
)
print(msg)
send_log_to_discord(msg, "WARNING", target)
track_info["status"] = "Failed"
track_info["error"] = str(e)
break # Exit retry loop immediately
elif is_5xx_error:
msg = (
f"Track {track_id} attempt {attempt} server error: {e}"
)
send_log_to_discord(msg, "WARNING", target)
track_info["error"] = err_str
# Recreate local aiohttp session
try:
await session.close()
except Exception:
pass
session = aiohttp.ClientSession(headers=HEADERS)
# Force a fresh Tidal login
try:
await sr._force_fresh_login()
send_log_to_discord(
f"Refreshed Tidal session after 5xx error on track {track_id}",
"WARNING",
target,
)
except Exception as login_err:
send_log_to_discord(
f"Failed to refresh Tidal session: {login_err}",
"ERROR",
target,
)
if attempt >= MAX_RETRIES:
track_info["status"] = "Failed"
send_log_to_discord(
f"Track {track_id} failed after {attempt} attempts (5xx)",
"ERROR",
target,
)
await asyncio.sleep(
random.uniform(THROTTLE_MIN, THROTTLE_MAX)
)
elif is_no_stream_url:
if attempt == 1 or attempt == MAX_RETRIES:
msg = f"Track {track_id} attempt {attempt} failed: {e}\n{tb}"
send_log_to_discord(msg, "ERROR", target)
track_info["error"] = str(e)
if attempt >= MAX_RETRIES:
track_info["status"] = "Failed"
send_log_to_discord(
f"Track {track_id} failed after {attempt} attempts",
"ERROR",
target,
)
await asyncio.sleep(
random.uniform(THROTTLE_MIN, THROTTLE_MAX)
)
else:
msg = (
f"Track {track_id} attempt {attempt} failed: {e}\n{tb}"
)
send_log_to_discord(msg, "ERROR", target)
track_info["error"] = str(e)
if attempt >= MAX_RETRIES:
track_info["status"] = "Failed"
send_log_to_discord(
f"Track {track_id} failed after {attempt} attempts",
"ERROR",
target,
)
await asyncio.sleep(
random.uniform(THROTTLE_MIN, THROTTLE_MAX)
)
finally:
try:
if tmp_file and tmp_file.exists():
os.remove(tmp_file)
except Exception:
pass
# Ensure placeholders and filename for the job metadata
track_info["title"] = track_info.get("title") or f"Track {track_id}"
track_info["artist"] = track_info.get("artist") or "Unknown Artist"
if track_info.get("file_path") and not track_info.get("filename"):
try:
track_info["filename"] = Path(track_info["file_path"]).name
except Exception:
track_info["filename"] = None
per_track_meta.append(track_info)
finally:
try:
await session.close()
except Exception:
pass
if not all_final_files:
if job:
job.meta["tarball"] = None
job.meta["status"] = "Failed"
job.save_meta()
send_log_to_discord(
f"No tracks were successfully downloaded for job `{job_id}`",
"CRITICAL",
target,
)
return []
# Tarball creation
artist_counts = {}
for t in per_track_meta:
if t["status"] == "Success" and t.get("file_path"):
try:
artist = Path(t["file_path"]).relative_to(staging_root).parts[0]
except Exception:
artist = "Unknown Artist"
artist_counts[artist] = artist_counts.get(artist, 0) + 1
top_artist = (
sorted(artist_counts.items(), key=lambda kv: (-kv[1], kv[0]))[0][0]
if artist_counts
else "Unknown Artist"
)
# Prefer `job.meta['target']` when provided by the enqueuer. Fall back to the top artist.
target_name = None
try:
if job and job.meta:
target_name = job.meta.get("target")
except Exception:
target_name = None
base_label = (
sanitize_filename(target_name)
if target_name
else sanitize_filename(top_artist)
)
staged_tarball = staging_root / f"{base_label}.tar.gz"
counter = 1
base_name = staged_tarball.stem
while staged_tarball.exists():
counter += 1
staged_tarball = staging_root / f"{base_name} ({counter}).tar.gz"
final_dir = Path("/storage/music/TRIP")
final_dir.mkdir(parents=True, exist_ok=True)
# Ensure we don't overwrite an existing final tarball. Preserve `.tar.gz` style.
final_tarball = ensure_unique_filename_in_dir(final_dir, staged_tarball.name)
if job:
job.meta["status"] = "Compressing"
job.save_meta()
logging.info("Creating tarball: %s", staged_tarball)
await discord_notify(
DISCORD_WEBHOOK,
title=f"Compressing: Job {job_id}",
description=f"Creating tarball: `{len(all_final_files)}` track(s).\nStaging path: {staged_tarball}",
color=0xFFA500,
target=target,
)
try:
subprocess.run(
[
"tar",
"-I",
"pigz -9",
"-cf",
str(staged_tarball),
"-C",
str(staging_root),
]
+ [str(f.relative_to(staging_root)) for f in all_final_files],
check=True,
)
for f in all_final_files:
try:
os.remove(f)
except Exception:
pass
except FileNotFoundError:
send_log_to_discord(
"pigz not available, falling back to tarfile (slower).",
"WARNING",
target,
)
with tarfile.open(staged_tarball, "w:gz") as tar:
for f in all_final_files:
try:
arcname = f.relative_to(staging_root)
except ValueError:
arcname = f.name
tar.add(f, arcname=str(arcname))
try:
os.remove(f)
except Exception:
pass
except Exception as e:
send_log_to_discord(f"Tar creation failed: {e}", "ERROR", target)
if job:
job.meta["status"] = "compress_failed"
job.save_meta()
# Do not proceed further if tarball creation failed
await asyncio.sleep(0.1)
return []
if not staged_tarball.exists():
send_log_to_discord(
f"Tarball was not created: `{staged_tarball}`", "CRITICAL", target
)
if job:
job.meta["status"] = "compress_failed"
job.save_meta()
return []
try:
staged_tarball.rename(final_tarball)
except Exception:
shutil.move(str(staged_tarball), str(final_tarball))
await asyncio.to_thread(shutil.rmtree, staging_root, ignore_errors=True)
if job:
job.meta["tarball"] = str(final_tarball)
job.meta["progress"] = 100
job.meta["status"] = "Completed"
job.save_meta()
# Job completed Discord message
completed = len(all_final_files)
failed = len(track_list) - completed
await discord_notify(
DISCORD_WEBHOOK,
title=f"Job Completed: {job_id}",
description=f"Processed `{len(track_list)}` track(s).\nCompleted: `{completed}`\nFailed: `{failed}`\nTarball: `{final_tarball}`",
target=target,
color=0x00FF00,
)
# Always log the final tarball path for debugging
logging.info("Job %s finished, tarball: %s", job_id, final_tarball)
return [str(final_tarball)]
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
return loop.run_until_complete(process_tracks(track_list))
except Exception as e:
send_log_to_discord(
f"bulk_download failed: {e}\n{traceback.format_exc()}", "CRITICAL", target
)
if job:
job.meta["status"] = "Failed"
job.save_meta()
finally:
loop.close()
# Correct integration of FLAC stream check
async def process_tracks(track_list):
for i, track_id in enumerate(track_list or []):
combined_path = f"/tmp/{uuid.uuid4().hex}_combined.m4s" # Example path
if not await check_flac_stream(combined_path):
logger.error(f"No FLAC stream found in {combined_path}. Skipping file.")
continue
# Proceed with decoding pipeline