Add bulk video download functionality

- Implemented `bulk_video_download` function to handle video downloads, including metadata fetching, HLS stream handling, and tarball creation.
- Enhanced `bulk_download` function in `rip_background.py` to improve error logging with formatted track descriptions.
- Added video search and metadata retrieval methods in `sr_wrapper.py` for better integration with Tidal's video API.
- Updated Tidal client credentials
This commit is contained in:
2026-02-18 13:38:26 -05:00
parent 9d16c96490
commit d6689b9c38
4 changed files with 1257 additions and 81 deletions

View File

@@ -18,6 +18,7 @@ import json
import os
import time
import asyncio
import ssl
from typing import Optional, Any
from dataclasses import dataclass, field
from enum import Enum
@@ -82,10 +83,14 @@ class Lighting:
# Configuration
TOKEN_EXPIRY_BUFFER = 300 # Consider token expired 5 min before actual expiry
CONNECTION_READY_TIMEOUT = 15 # Max seconds to wait for TCP connection to be ready
COMMAND_TIMEOUT = 10 # Max seconds for a single device command
COMMAND_LOCK_TIMEOUT = 30 # Max seconds to wait for command lock
COMMAND_DELAY = 0.3 # Delay between sequential commands
MAX_RETRIES = 3
MAX_CONSECUTIVE_FAILURES = 5 # Force full reconnect after this many failures
HEALTH_CHECK_INTERVAL = 30 # Check connection health every 30s
MAX_CONNECTION_AGE = 1800 # Force reconnect after 30 minutes
MAX_IDLE_SECONDS = 300 # Reconnect if idle for 5 minutes
TWO_FA_POLL_INTERVAL = 5 # Poll for 2FA code every 5 seconds
TWO_FA_TIMEOUT = 300 # 5 minutes to enter 2FA code
REDIS_2FA_KEY = "cync:2fa_code"
@@ -113,6 +118,7 @@ class Lighting:
# Connection state
self._state = CyncConnectionState()
self._connection_lock = asyncio.Lock()
self._command_lock = asyncio.Lock()
self._health_task: Optional[asyncio.Task] = None
self._2fa_task: Optional[asyncio.Task] = None
@@ -323,6 +329,10 @@ class Lighting:
logger.info("Cync TCP manager not connected; will reconnect")
return False
if self._is_connection_stale():
logger.info("Cync connection is stale; will reconnect")
return False
# Check token expiry
if self._is_token_expired():
logger.info("Token expired or expiring soon")
@@ -330,6 +340,18 @@ class Lighting:
return True
def _is_connection_stale(self) -> bool:
"""Reconnect if the connection is too old or has been idle too long."""
now = time.time()
if self._state.connected_at and now - self._state.connected_at > self.MAX_CONNECTION_AGE:
return True
last_activity = self._state.last_successful_command or self._state.connected_at
if last_activity and now - last_activity > self.MAX_IDLE_SECONDS:
return True
return False
def _is_tcp_connected(self) -> bool:
"""Best-effort check that the pycync TCP connection is alive."""
client = getattr(self._state.cync_api, "_command_client", None)
@@ -359,6 +381,24 @@ class Lighting:
return True
def _is_ssl_closed_error(self, error: Exception) -> bool:
"""Detect SSL/TCP connection closed errors."""
if isinstance(
error,
(
ssl.SSLError,
aiohttp.ClientConnectionError,
ConnectionResetError,
BrokenPipeError,
ConnectionError,
asyncio.IncompleteReadError,
),
):
return True
message = str(error).lower()
return "ssl connection is closed" in message or "connection reset" in message
def _is_token_expired(self) -> bool:
"""Check if token is expired or will expire soon."""
if not self._state.user:
@@ -433,9 +473,12 @@ class Lighting:
)
try:
self._state.user = await self._state.auth.login()
self._save_cached_token(self._state.user)
logger.info("Cync login successful")
if self._state.auth:
self._state.user = await self._state.auth.login()
self._save_cached_token(self._state.user)
logger.info("Cync login successful")
else:
raise TwoFactorRequiredError("Unknown 2FA error")
except TwoFactorRequiredError:
await self._handle_2fa()
except AuthFailedError as e:
@@ -644,6 +687,11 @@ class Lighting:
needs_reconnect = True
reason = "TCP connection lost"
# Reconnect if connection is stale
elif self._is_connection_stale():
needs_reconnect = True
reason = "connection stale"
if needs_reconnect:
logger.warning(f"Health monitor triggering reconnection: {reason}")
self._state.status = ConnectionStatus.CONNECTING
@@ -707,26 +755,38 @@ class Lighting:
device = await self._get_device()
logger.info(f"Sending commands to device: {device.name}")
# Power
if power == "on":
await device.turn_on()
logger.debug("Sent turn_on")
else:
await device.turn_off()
logger.debug("Sent turn_off")
# Power - with timeout to prevent hangs
try:
if power == "on":
await asyncio.wait_for(device.turn_on(), timeout=self.COMMAND_TIMEOUT)
logger.debug("Sent turn_on")
else:
await asyncio.wait_for(device.turn_off(), timeout=self.COMMAND_TIMEOUT)
logger.debug("Sent turn_off")
except asyncio.TimeoutError:
raise TimeoutError(f"Power command timed out after {self.COMMAND_TIMEOUT}s")
await asyncio.sleep(self.COMMAND_DELAY)
# Brightness
# Brightness - with timeout
if brightness is not None:
await device.set_brightness(brightness)
logger.debug(f"Sent brightness: {brightness}")
try:
await asyncio.wait_for(device.set_brightness(brightness), timeout=self.COMMAND_TIMEOUT)
logger.debug(f"Sent brightness: {brightness}")
except asyncio.TimeoutError:
raise TimeoutError(f"Brightness command timed out after {self.COMMAND_TIMEOUT}s")
await asyncio.sleep(self.COMMAND_DELAY)
# Color
# Color - with timeout
if rgb:
await device.set_rgb(rgb)
logger.debug(f"Sent RGB: {rgb}")
await asyncio.sleep(self.COMMAND_DELAY)
try:
await asyncio.wait_for(device.set_rgb(rgb), timeout=self.COMMAND_TIMEOUT)
logger.debug(f"Sent RGB: {rgb}")
except asyncio.TimeoutError:
raise TimeoutError(f"RGB command timed out after {self.COMMAND_TIMEOUT}s")
# Verify connection is still alive after sending
if not self._is_tcp_connected():
raise ConnectionError("Cync TCP connection lost after command")
# Track success
now = time.time()
@@ -851,46 +911,65 @@ class Lighting:
"""Apply state to device with connection retry logic."""
last_error: Optional[Exception] = None
for attempt in range(self.MAX_RETRIES):
try:
# Ensure connection (force reconnect on retries)
await self._connect(force=(attempt > 0))
# Try to acquire command lock with timeout to prevent indefinite waits
try:
await asyncio.wait_for(
self._command_lock.acquire(), timeout=self.COMMAND_LOCK_TIMEOUT
)
except asyncio.TimeoutError:
logger.error(f"Command lock acquisition timed out after {self.COMMAND_LOCK_TIMEOUT}s")
raise TimeoutError("Another command is in progress and not responding")
# Send commands
await self._send_commands(power, brightness, rgb)
return # Success
try:
for attempt in range(self.MAX_RETRIES):
try:
# Ensure connection (force reconnect on retries)
await self._connect(force=(attempt > 0))
except (AuthFailedError, TwoFactorRequiredError) as e:
last_error = e
self._state.consecutive_failures += 1
self._state.last_error = str(e)
logger.warning(f"Auth error on attempt {attempt + 1}: {e}")
self._clear_cached_token()
# Send commands
await self._send_commands(power, brightness, rgb)
return # Success
except TimeoutError as e:
last_error = e
self._state.consecutive_failures += 1
self._state.last_error = str(e)
logger.warning(f"Timeout on attempt {attempt + 1}: {e}")
except (AuthFailedError, TwoFactorRequiredError) as e:
last_error = e
self._state.consecutive_failures += 1
self._state.last_error = str(e)
logger.warning(f"Auth error on attempt {attempt + 1}: {e}")
self._clear_cached_token()
except Exception as e:
last_error = e
self._state.consecutive_failures += 1
self._state.last_error = str(e)
logger.warning(
f"Error on attempt {attempt + 1}: {type(e).__name__}: {e}"
)
except TimeoutError as e:
last_error = e
self._state.consecutive_failures += 1
self._state.last_error = str(e)
logger.warning(f"Timeout on attempt {attempt + 1}: {e}")
# Wait before retry (exponential backoff)
if attempt < self.MAX_RETRIES - 1:
wait_time = 2**attempt
logger.info(f"Retrying in {wait_time}s...")
await asyncio.sleep(wait_time)
except Exception as e:
last_error = e
self._state.consecutive_failures += 1
self._state.last_error = str(e)
if self._is_ssl_closed_error(e) or not self._is_tcp_connected():
logger.warning(
"Connection lost during command; will reconnect and retry"
)
self._state.status = ConnectionStatus.CONNECTING
self._update_status_in_redis()
else:
logger.warning(
f"Error on attempt {attempt + 1}: {type(e).__name__}: {e}"
)
# All retries failed
self._update_status_in_redis()
logger.error(f"All {self.MAX_RETRIES} attempts failed")
raise last_error or RuntimeError("Failed to apply lighting state")
# Wait before retry (exponential backoff)
if attempt < self.MAX_RETRIES - 1:
wait_time = 2**attempt
logger.info(f"Retrying in {wait_time}s...")
await asyncio.sleep(wait_time)
# All retries failed
self._update_status_in_redis()
logger.error(f"All {self.MAX_RETRIES} attempts failed")
raise last_error or RuntimeError("Failed to apply lighting state")
finally:
self._command_lock.release()
# =========================================================================
# Connection Status & 2FA Endpoints

View File

@@ -16,7 +16,7 @@ from rq.registry import (
FailedJobRegistry,
ScheduledJobRegistry,
)
from utils.rip_background import bulk_download
from utils.rip_background import bulk_download, bulk_video_download
from lyric_search.sources import private
from typing import Literal
from pydantic import BaseModel
@@ -30,6 +30,11 @@ class ValidBulkFetchRequest(BaseModel):
quality: Literal["FLAC", "Lossy"] = "FLAC"
class ValidVideoBulkFetchRequest(BaseModel):
video_ids: list[int]
target: str
class RIP(FastAPI):
"""
Ripping Endpoints
@@ -65,6 +70,13 @@ class RIP(FastAPI):
"trip/jobs/list": self.job_list_handler,
"trip/auth/start": self.tidal_auth_start_handler,
"trip/auth/check": self.tidal_auth_check_handler,
# Video endpoints - order matters: specific routes before parameterized ones
"trip/videos/search": self.video_search_handler,
"trip/videos/artist/{artist_id:path}": self.videos_by_artist_handler,
"trip/videos/bulk_fetch": self.video_bulk_fetch_handler,
"trip/video/{video_id:path}/stream": self.video_stream_handler,
"trip/video/{video_id:path}/download": self.video_download_handler,
"trip/video/{video_id:path}": self.video_metadata_handler,
}
# Store pending device codes for auth flow
@@ -77,10 +89,10 @@ class RIP(FastAPI):
handler,
methods=(
["GET"]
if endpoint not in ("trip/bulk_fetch", "trip/auth/check")
if endpoint not in ("trip/bulk_fetch", "trip/auth/check", "trip/videos/bulk_fetch")
else ["POST"]
),
include_in_schema=False,
include_in_schema=True,
dependencies=dependencies,
)
@@ -530,3 +542,250 @@ class RIP(FastAPI):
content={"error": str(e)},
status_code=500,
)
# =========================================================================
# Video Endpoints
# =========================================================================
async def video_search_handler(
self, q: str, request: Request, limit: int = 50, user=Depends(get_current_user)
) -> Response:
"""
Search for videos by query string.
Parameters:
- **q** (str): Search query (artist, song title, etc.)
- **limit** (int): Maximum number of results (default 50).
- **request** (Request): The request object.
- **user**: Current user (dependency).
Returns:
- **Response**: JSON response with video results or 404.
"""
if "trip" not in user.get("roles", []) and "admin" not in user.get("roles", []):
raise HTTPException(status_code=403, detail="Insufficient permissions")
if not q or not q.strip():
return JSONResponse(
content={"error": "Query parameter 'q' is required"},
status_code=400,
)
videos = await self.trip_util.search_videos(q.strip(), limit=limit)
if not videos:
return JSONResponse(
content={"error": "No videos found"},
status_code=404,
)
return JSONResponse(content={"videos": videos})
async def video_metadata_handler(
self, video_id: str, request: Request, user=Depends(get_current_user)
) -> Response:
"""
Get metadata for a specific video.
Parameters:
- **video_id** (str): The Tidal video ID.
- **request** (Request): The request object.
- **user**: Current user (dependency).
Returns:
- **Response**: JSON response with video metadata or 404.
"""
if "trip" not in user.get("roles", []) and "admin" not in user.get("roles", []):
raise HTTPException(status_code=403, detail="Insufficient permissions")
try:
vid_id = int(video_id)
except ValueError:
return JSONResponse(
content={"error": "Invalid video ID"},
status_code=400,
)
metadata = await self.trip_util.get_video_metadata(vid_id)
if not metadata:
return JSONResponse(
content={"error": "Video not found"},
status_code=404,
)
return JSONResponse(content=metadata)
async def video_stream_handler(
self, video_id: str, request: Request, user=Depends(get_current_user)
) -> Response:
"""
Get the stream URL for a video.
Parameters:
- **video_id** (str): The Tidal video ID.
- **request** (Request): The request object.
- **user**: Current user (dependency).
Returns:
- **Response**: JSON response with stream URL or 404.
"""
if "trip" not in user.get("roles", []) and "admin" not in user.get("roles", []):
raise HTTPException(status_code=403, detail="Insufficient permissions")
try:
vid_id = int(video_id)
except ValueError:
return JSONResponse(
content={"error": "Invalid video ID"},
status_code=400,
)
stream_url = await self.trip_util.get_video_stream_url(vid_id)
if not stream_url:
return JSONResponse(
content={"error": "Video stream not available"},
status_code=404,
)
return JSONResponse(content={"stream_url": stream_url})
async def videos_by_artist_handler(
self, artist_id: str, request: Request, user=Depends(get_current_user)
) -> Response:
"""
Get videos by artist ID.
Parameters:
- **artist_id** (str): The Tidal artist ID.
- **request** (Request): The request object.
- **user**: Current user (dependency).
Returns:
- **Response**: JSON response with artist's videos or 404.
"""
if "trip" not in user.get("roles", []) and "admin" not in user.get("roles", []):
raise HTTPException(status_code=403, detail="Insufficient permissions")
try:
art_id = int(artist_id)
except ValueError:
return JSONResponse(
content={"error": "Invalid artist ID"},
status_code=400,
)
videos = await self.trip_util.get_videos_by_artist_id(art_id)
if not videos:
return JSONResponse(
content={"error": "No videos found for this artist"},
status_code=404,
)
return JSONResponse(content={"videos": videos})
async def video_download_handler(
self, video_id: str, request: Request, user=Depends(get_current_user)
) -> Response:
"""
Download a video file.
Parameters:
- **video_id** (str): The Tidal video ID.
- **request** (Request): The request object.
- **user**: Current user (dependency).
Returns:
- **Response**: The video file as a streaming response.
"""
from fastapi.responses import FileResponse
import os
if "trip" not in user.get("roles", []) and "admin" not in user.get("roles", []):
raise HTTPException(status_code=403, detail="Insufficient permissions")
try:
vid_id = int(video_id)
except ValueError:
return JSONResponse(
content={"error": "Invalid video ID"},
status_code=400,
)
# Get video metadata for filename
metadata = await self.trip_util.get_video_metadata(vid_id)
if not metadata:
return JSONResponse(
content={"error": "Video not found"},
status_code=404,
)
# Download the video
file_path = await self.trip_util.download_video(vid_id)
if not file_path or not os.path.exists(file_path):
return JSONResponse(
content={"error": "Failed to download video"},
status_code=500,
)
# Generate a nice filename
artist = metadata.get("artist", "Unknown")
title = metadata.get("title", f"video_{vid_id}")
# Sanitize filename
safe_filename = f"{artist} - {title}.mp4".replace("/", "-").replace("\\", "-")
return FileResponse(
path=file_path,
filename=safe_filename,
media_type="video/mp4",
)
async def video_bulk_fetch_handler(
self,
data: ValidVideoBulkFetchRequest,
request: Request,
user=Depends(get_current_user),
) -> Response:
"""
Bulk fetch a list of video IDs.
Parameters:
- **data** (ValidVideoBulkFetchRequest): Bulk video fetch request data.
- **request** (Request): The request object.
- **user**: Current user (dependency).
Returns:
- **Response**: JSON response with job info or error.
"""
if "trip" not in user.get("roles", []) and "admin" not in user.get("roles", []):
raise HTTPException(status_code=403, detail="Insufficient permissions")
if not data or not data.video_ids or not data.target:
return JSONResponse(
content={
"err": True,
"errorText": "Invalid data",
}
)
video_ids = data.video_ids
target = data.target
job = self.task_queue.enqueue(
bulk_video_download,
args=(video_ids,),
job_timeout=28800, # 8 hours for videos
failure_ttl=86400,
result_ttl=-1,
meta={
"progress": 0,
"status": "Queued",
"target": target,
"videos_in": len(video_ids),
"type": "video",
},
)
self.redis_conn.lpush("enqueued_job_ids", job.id)
return JSONResponse(
content={
"job_id": job.id,
"status": "Queued",
"videos": len(video_ids),
"target": target,
}
)

View File

@@ -320,6 +320,15 @@ def bulk_download(track_list: list, quality: str = "FLAC"):
}
attempt = 0
def _track_desc() -> str:
"""Format track info for log messages."""
title = track_info.get("title") or f"Track {track_id}"
artist = track_info.get("artist") or "Unknown"
album = track_info.get("album") or ""
if album:
return f"{track_id} - '{title}' by {artist} [{album}]"
return f"{track_id} - '{title}' by {artist}"
# Fetch metadata FIRST to check if track is available before attempting download
md = None
try:
@@ -327,6 +336,12 @@ def bulk_download(track_list: list, quality: str = "FLAC"):
md = await sr.get_metadata_by_track_id(track_id) or {}
print(f"DEBUG: Metadata fetched: {bool(md)}")
# Populate track_info immediately so failure logs have useful info
if md:
track_info["title"] = md.get("title") or f"Track {track_id}"
track_info["artist"] = md.get("artist") or "Unknown Artist"
track_info["album"] = md.get("album") or "Unknown Album"
# Check if track is streamable
if md and not md.get("streamable", True):
print(f"TRACK {track_id}: Not streamable, skipping")
@@ -564,7 +579,7 @@ def bulk_download(track_list: list, quality: str = "FLAC"):
break
except aiohttp.ClientResponseError as e:
msg = f"Track {track_id} attempt {attempt} ClientResponseError: {e}"
msg = f"Track {_track_desc()} attempt {attempt} ClientResponseError: {e}"
send_log_to_discord(msg, "WARNING", target)
# If 429, backoff as before. If 5xx, recreate session and refresh Tidal client.
if getattr(e, "status", None) == 429:
@@ -581,7 +596,7 @@ def bulk_download(track_list: list, quality: str = "FLAC"):
try:
await sr._force_fresh_login()
send_log_to_discord(
f"Refreshed Tidal session after 5xx error on track {track_id}",
f"Refreshed Tidal session after 5xx error on track {_track_desc()}",
"WARNING",
target,
)
@@ -625,7 +640,7 @@ def bulk_download(track_list: list, quality: str = "FLAC"):
if is_not_found:
# Permanent failure - do not retry
msg = (
f"Track {track_id} not found/unavailable, skipping: {e}"
f"Track {_track_desc()} not found/unavailable, skipping: {e}"
)
print(msg)
send_log_to_discord(msg, "WARNING", target)
@@ -634,7 +649,7 @@ def bulk_download(track_list: list, quality: str = "FLAC"):
break # Exit retry loop immediately
elif is_5xx_error:
msg = (
f"Track {track_id} attempt {attempt} server error: {e}"
f"Track {_track_desc()} attempt {attempt} server error: {e}"
)
send_log_to_discord(msg, "WARNING", target)
track_info["error"] = err_str
@@ -648,7 +663,7 @@ def bulk_download(track_list: list, quality: str = "FLAC"):
try:
await sr._force_fresh_login()
send_log_to_discord(
f"Refreshed Tidal session after 5xx error on track {track_id}",
f"Refreshed Tidal session after 5xx error on track {_track_desc()}",
"WARNING",
target,
)
@@ -661,7 +676,7 @@ def bulk_download(track_list: list, quality: str = "FLAC"):
if attempt >= MAX_RETRIES:
track_info["status"] = "Failed"
send_log_to_discord(
f"Track {track_id} failed after {attempt} attempts (5xx)",
f"Track {_track_desc()} failed after {attempt} attempts (5xx)",
"ERROR",
target,
)
@@ -670,13 +685,13 @@ def bulk_download(track_list: list, quality: str = "FLAC"):
)
elif is_no_stream_url:
if attempt == 1 or attempt == MAX_RETRIES:
msg = f"Track {track_id} attempt {attempt} failed: {e}\n{tb}"
msg = f"Track {_track_desc()} attempt {attempt} failed: {e}\n{tb}"
send_log_to_discord(msg, "ERROR", target)
track_info["error"] = str(e)
if attempt >= MAX_RETRIES:
track_info["status"] = "Failed"
send_log_to_discord(
f"Track {track_id} failed after {attempt} attempts",
f"Track {_track_desc()} failed after {attempt} attempts",
"ERROR",
target,
)
@@ -685,14 +700,14 @@ def bulk_download(track_list: list, quality: str = "FLAC"):
)
else:
msg = (
f"Track {track_id} attempt {attempt} failed: {e}\n{tb}"
f"Track {_track_desc()} attempt {attempt} failed: {e}\n{tb}"
)
send_log_to_discord(msg, "ERROR", target)
track_info["error"] = str(e)
if attempt >= MAX_RETRIES:
track_info["status"] = "Failed"
send_log_to_discord(
f"Track {track_id} failed after {attempt} attempts",
f"Track {_track_desc()} failed after {attempt} attempts",
"ERROR",
target,
)
@@ -885,11 +900,367 @@ def bulk_download(track_list: list, quality: str = "FLAC"):
loop.close()
# Correct integration of FLAC stream check
async def process_tracks(track_list):
for i, track_id in enumerate(track_list or []):
combined_path = f"/tmp/{uuid.uuid4().hex}_combined.m4s" # Example path
if not await check_flac_stream(combined_path):
logger.error(f"No FLAC stream found in {combined_path}. Skipping file.")
continue
# Proceed with decoding pipeline
# ---------- bulk_video_download ----------
def bulk_video_download(video_list: list):
"""
RQ job for bulk video downloads:
- fetches video metadata and HLS streams
- downloads with ffmpeg in highest quality
- creates ONE tarball for all videos
- returns [tarball_path]
- sends relevant messages to Discord
"""
job = get_current_job()
job_id = job.id if job else uuid.uuid4().hex
target = job.meta.get("target") if job else None
staging_root = ROOT_DIR / f"video_{job_id}"
if job:
try:
job.meta["video_ids"] = [str(v) for v in (video_list or [])]
job.meta["videos"] = []
job.meta["progress"] = 0
job.meta["tarball"] = None
job.meta["status"] = "Started"
job.save_meta()
except Exception as e:
send_log_to_discord(f"Failed to init job.meta: {e}", "WARNING", target)
# Job started Discord message
asyncio.run(
discord_notify(
DISCORD_WEBHOOK,
title=f"Video Job Started: {job_id}",
description=f"Processing `{len(video_list)}` video(s)",
target=target,
color=0x00FFFF,
)
)
async def process_videos(video_list):
per_video_meta = []
all_final_files = []
all_artists = set()
(ROOT_DIR / "completed").mkdir(parents=True, exist_ok=True)
total = len(video_list or [])
for i, video_id in enumerate(video_list or []):
print(f"DEBUG: Processing video {i + 1}/{total}: {video_id}")
video_info = {
"video_id": str(video_id),
"title": None,
"artist": None,
"status": "Pending",
"file_path": None,
"filename": None,
"error": None,
"attempts": 0,
}
attempt = 0
def _video_desc() -> str:
"""Format video info for log messages."""
title = video_info.get("title") or f"Video {video_id}"
artist = video_info.get("artist") or "Unknown"
return f"{video_id} - '{title}' by {artist}"
# Fetch metadata first
md = None
try:
print(f"DEBUG: Fetching metadata for video {video_id}")
md = await sr.get_video_metadata(video_id)
print(f"DEBUG: Metadata fetched: {bool(md)}")
if md:
video_info["title"] = md.get("title") or f"Video {video_id}"
video_info["artist"] = md.get("artist") or "Unknown Artist"
except Exception as meta_err:
print(f"VIDEO {video_id}: Metadata fetch failed: {meta_err}")
md = None
while attempt < MAX_RETRIES:
attempt += 1
video_info["attempts"] = attempt
try:
# Use sr.download_video which handles HLS and quality selection
print(f"VIDEO {video_id}: Starting download (attempt {attempt})")
# Download to temporary location
tmp_dir = Path(f"/tmp/video_{uuid.uuid4().hex}")
tmp_dir.mkdir(parents=True, exist_ok=True)
# Add timeout for video download (30 minutes max per video)
try:
file_path = await asyncio.wait_for(
sr.download_video(video_id, str(tmp_dir)),
timeout=1800 # 30 minutes
)
except asyncio.TimeoutError:
print(f"VIDEO {video_id}: Download timed out after 30 minutes")
raise RuntimeError("Download timed out after 30 minutes")
if not file_path or not Path(file_path).exists():
raise RuntimeError("Download completed but no file created")
# If we didn't get metadata earlier, try again
if not md:
try:
md = await sr.get_video_metadata(video_id)
except Exception:
md = {}
md = md or {}
artist_raw = md.get("artist") or "Unknown Artist"
title_raw = md.get("title") or f"Video {video_id}"
artist = sanitize_filename(artist_raw)
title = sanitize_filename(title_raw)
video_info["title"] = title
video_info["artist"] = artist
print(f"VIDEO {video_id}: Processing '{title}' by {artist}")
all_artists.add(artist)
video_dir = staging_root / artist
video_dir.mkdir(parents=True, exist_ok=True)
final_file = ensure_unique_path(video_dir / f"{title}.mp4")
# Move to final location
print(f"VIDEO {video_id}: Moving to final location...")
shutil.move(str(file_path), str(final_file))
# Clean up temp dir
try:
shutil.rmtree(tmp_dir, ignore_errors=True)
except Exception:
pass
print(f"VIDEO {video_id}: File moved successfully")
# Success
video_info["status"] = "Success"
video_info["file_path"] = str(final_file)
try:
video_info["filename"] = final_file.name
except Exception:
video_info["filename"] = None
video_info["error"] = None
all_final_files.append(final_file)
print(
f"VIDEO {video_id}: SUCCESS! Progress: {((i + 1) / total) * 100:.0f}%"
)
# Throttle
await asyncio.sleep(random.uniform(THROTTLE_MIN, THROTTLE_MAX))
if job:
job.meta["progress"] = int(((i + 1) / total) * 100)
job.meta["videos"] = per_video_meta + [video_info]
job.save_meta()
break
except Exception as e:
tb = traceback.format_exc()
err_str = str(e).lower()
is_not_found = any(
phrase in err_str
for phrase in (
"video not found",
"not found",
"404",
"does not exist",
"no longer available",
)
)
if is_not_found:
msg = f"Video {_video_desc()} not found/unavailable, skipping: {e}"
print(msg)
send_log_to_discord(msg, "WARNING", target)
video_info["status"] = "Failed"
video_info["error"] = str(e)
break
else:
msg = f"Video {_video_desc()} attempt {attempt} failed: {e}\n{tb}"
send_log_to_discord(msg, "ERROR", target)
video_info["error"] = str(e)
if attempt >= MAX_RETRIES:
video_info["status"] = "Failed"
send_log_to_discord(
f"Video {_video_desc()} failed after {attempt} attempts",
"ERROR",
target,
)
await asyncio.sleep(random.uniform(THROTTLE_MIN, THROTTLE_MAX))
# Ensure placeholders for job metadata
video_info["title"] = video_info.get("title") or f"Video {video_id}"
video_info["artist"] = video_info.get("artist") or "Unknown Artist"
if video_info.get("file_path") and not video_info.get("filename"):
try:
video_info["filename"] = Path(video_info["file_path"]).name
except Exception:
video_info["filename"] = None
per_video_meta.append(video_info)
if not all_final_files:
if job:
job.meta["tarball"] = None
job.meta["status"] = "Failed"
job.save_meta()
send_log_to_discord(
f"No videos were successfully downloaded for job `{job_id}`",
"CRITICAL",
target,
)
return []
# Tarball creation
artist_counts = {}
for v in per_video_meta:
if v["status"] == "Success" and v.get("file_path"):
try:
artist = Path(v["file_path"]).relative_to(staging_root).parts[0]
except Exception:
artist = "Unknown Artist"
artist_counts[artist] = artist_counts.get(artist, 0) + 1
top_artist = (
sorted(artist_counts.items(), key=lambda kv: (-kv[1], kv[0]))[0][0]
if artist_counts
else "Unknown Artist"
)
target_name = None
try:
if job and job.meta:
target_name = job.meta.get("target")
except Exception:
target_name = None
base_label = (
sanitize_filename(target_name)
if target_name
else sanitize_filename(top_artist)
)
staged_tarball = staging_root / f"{base_label}_videos.tar.gz"
counter = 1
base_name = staged_tarball.stem
while staged_tarball.exists():
counter += 1
staged_tarball = staging_root / f"{base_name} ({counter}).tar.gz"
final_dir = Path("/storage/music/TRIP/videos")
final_dir.mkdir(parents=True, exist_ok=True)
final_tarball = ensure_unique_filename_in_dir(final_dir, staged_tarball.name)
if job:
job.meta["status"] = "Compressing"
job.save_meta()
logging.info("Creating video tarball: %s", staged_tarball)
await discord_notify(
DISCORD_WEBHOOK,
title=f"Compressing: Video Job {job_id}",
description=f"Creating tarball: `{len(all_final_files)}` video(s).\nStaging path: {staged_tarball}",
color=0xFFA500,
target=target,
)
try:
subprocess.run(
[
"tar",
"-I",
"pigz -9",
"-cf",
str(staged_tarball),
"-C",
str(staging_root),
]
+ [str(f.relative_to(staging_root)) for f in all_final_files],
check=True,
)
for f in all_final_files:
try:
os.remove(f)
except Exception:
pass
except FileNotFoundError:
send_log_to_discord(
"pigz not available, falling back to tarfile (slower).",
"WARNING",
target,
)
with tarfile.open(staged_tarball, "w:gz") as tar:
for f in all_final_files:
try:
arcname = f.relative_to(staging_root)
except ValueError:
arcname = f.name
tar.add(f, arcname=str(arcname))
try:
os.remove(f)
except Exception:
pass
except Exception as e:
send_log_to_discord(f"Video tar creation failed: {e}", "ERROR", target)
if job:
job.meta["status"] = "compress_failed"
job.save_meta()
return []
if not staged_tarball.exists():
send_log_to_discord(
f"Video tarball was not created: `{staged_tarball}`", "CRITICAL", target
)
if job:
job.meta["status"] = "compress_failed"
job.save_meta()
return []
try:
staged_tarball.rename(final_tarball)
except Exception:
shutil.move(str(staged_tarball), str(final_tarball))
await asyncio.to_thread(shutil.rmtree, staging_root, ignore_errors=True)
if job:
job.meta["tarball"] = str(final_tarball)
job.meta["progress"] = 100
job.meta["status"] = "Completed"
job.save_meta()
# Job completed Discord message
completed = len(all_final_files)
failed = len(video_list) - completed
await discord_notify(
DISCORD_WEBHOOK,
title=f"Video Job Completed: {job_id}",
description=f"Processed `{len(video_list)}` video(s).\nCompleted: `{completed}`\nFailed: `{failed}`\nTarball: `{final_tarball}`",
target=target,
color=0x00FF00,
)
logging.info("Video job %s finished, tarball: %s", job_id, final_tarball)
return [str(final_tarball)]
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
return loop.run_until_complete(process_videos(video_list))
except Exception as e:
send_log_to_discord(
f"bulk_video_download failed: {e}\n{traceback.format_exc()}", "CRITICAL", target
)
if job:
job.meta["status"] = "Failed"
job.save_meta()
finally:
loop.close()

View File

@@ -12,12 +12,19 @@ import json
import os
import aiohttp
import time
import base64
# Monkey-patch streamrip's Tidal client credentials BEFORE importing TidalClient
import streamrip.client.tidal as _tidal_module # type: ignore # noqa: E402
_tidal_module.CLIENT_ID = "fX2JxdmntZWK0ixT"
_tidal_module.CLIENT_SECRET = "1Nn9AfDAjxrgJFJbKNWLeAyKGVGmINuXPPLHVXAvxAg="
CLIENT_ID = base64.b64decode("ZlgySnhkbW50WldLMGl4VA==").decode("iso-8859-1")
CLIENT_SECRET = base64.b64decode(
"MU5tNUFmREFqeHJnSkZKYktOV0xlQXlLR1ZHbUlOdVhQUExIVlhBdnhBZz0=",
).decode("iso-8859-1")
_tidal_module.CLIENT_ID = CLIENT_ID
_tidal_module.CLIENT_SECRET = CLIENT_SECRET
_tidal_module.AUTH = aiohttp.BasicAuth(
login=_tidal_module.CLIENT_ID, password=_tidal_module.CLIENT_SECRET
)
@@ -306,14 +313,21 @@ class SRUtil:
if not token_expiry:
return True # No expiry info means we should refresh
try:
# token_expiry is typically an ISO timestamp string
if isinstance(token_expiry, str):
from datetime import datetime
expiry_dt = datetime.fromisoformat(token_expiry.replace("Z", "+00:00"))
expiry_ts = expiry_dt.timestamp()
else:
# token_expiry can be a Unix timestamp (float/int/string) or ISO string
if not isinstance(token_expiry, str):
expiry_ts = float(token_expiry)
else:
# Try parsing as a numeric Unix timestamp first
try:
expiry_ts = float(token_expiry)
except ValueError:
# Fall back to ISO format string
from datetime import datetime
expiry_dt = datetime.fromisoformat(
token_expiry.replace("Z", "+00:00")
)
expiry_ts = expiry_dt.timestamp()
return expiry_ts < (time.time() + TIDAL_TOKEN_REFRESH_BUFFER)
except Exception as e:
logging.warning("Failed to parse token expiry '%s': %s", token_expiry, e)
@@ -1167,6 +1181,459 @@ class SRUtil:
logging.critical("Error: %s", str(e))
return False
# =========================================================================
# Video Support
# =========================================================================
async def search_videos(self, query: str, limit: int = 50) -> Optional[list[dict]]:
"""Search for videos by query string.
Args:
query: Search query (artist name, song title, etc.)
limit: Maximum number of results to return.
Returns:
List of video results with id, title, artist, duration, etc.
"""
max_retries = 4
delay = 1.0
for attempt in range(max_retries):
try:
results = await self._safe_api_call(
self.streamrip_client.search,
media_type="video",
query=query,
limit=limit,
retries=3,
)
break
except Exception as e:
msg = str(e)
if ("400" in msg or "429" in msg) and attempt < max_retries - 1:
await asyncio.sleep(delay)
delay *= 2
continue
else:
logging.warning("Video search failed: %s", e)
return None
else:
return None
if not results:
return None
# Results can be paged - get items from first page
if isinstance(results, list):
results_page = results[0] if results else {}
else:
results_page = results
items = results_page.get("items", []) if isinstance(results_page, dict) else []
if not items:
return None
videos_out = []
for item in items:
artist_info = item.get("artist") or item.get("artists", [{}])[0] if item.get("artists") else {}
artist_name = artist_info.get("name", "Unknown Artist") if isinstance(artist_info, dict) else str(artist_info)
videos_out.append({
"id": item.get("id"),
"title": item.get("title"),
"artist": artist_name,
"duration": item.get("duration"),
"duration_formatted": self.format_duration(item.get("duration")),
"release_date": item.get("releaseDate"),
"image_id": item.get("imageId"),
"image_url": (
f"https://resources.tidal.com/images/{item.get('imageId').replace('-', '/')}/640x360.jpg"
if item.get("imageId")
else None
),
"quality": item.get("quality"),
})
return videos_out
async def get_video_metadata(self, video_id: int) -> Optional[dict]:
"""Get metadata for a specific video by ID.
Args:
video_id: The Tidal video ID.
Returns:
Video metadata dict or None if not found.
"""
video_id_str = str(video_id)
try:
metadata = await self._safe_api_call(
self.streamrip_client.get_metadata,
item_id=video_id_str,
media_type="video",
retries=3,
)
except Exception as e:
logging.warning("get_video_metadata failed for %s: %s", video_id, e)
return None
if not metadata:
return None
artist_info = metadata.get("artist") or (metadata.get("artists", [{}])[0] if metadata.get("artists") else {})
artist_name = artist_info.get("name", "Unknown Artist") if isinstance(artist_info, dict) else str(artist_info)
return {
"id": metadata.get("id"),
"title": metadata.get("title"),
"artist": artist_name,
"artists": [a.get("name") for a in metadata.get("artists", [])],
"duration": metadata.get("duration"),
"duration_formatted": self.format_duration(metadata.get("duration")),
"release_date": metadata.get("releaseDate"),
"image_id": metadata.get("imageId"),
"image_url": (
f"https://resources.tidal.com/images/{metadata.get('imageId').replace('-', '/')}/1280x720.jpg"
if metadata.get("imageId")
else None
),
"thumbnail_url": (
f"https://resources.tidal.com/images/{metadata.get('imageId').replace('-', '/')}/640x360.jpg"
if metadata.get("imageId")
else None
),
"quality": metadata.get("quality"),
"explicit": metadata.get("explicit"),
"album": metadata.get("album", {}).get("title") if metadata.get("album") else None,
"album_id": metadata.get("album", {}).get("id") if metadata.get("album") else None,
}
async def get_video_stream_url(self, video_id: int) -> Optional[str]:
"""Get the HLS stream URL for a video.
Args:
video_id: The Tidal video ID.
Returns:
The highest quality video HLS variant URL (.m3u8) or None if not available.
"""
video_id_str = str(video_id)
logging.info("VIDEO %s: Fetching stream URL...", video_id)
try:
# First try the standard streamrip method
logging.info("VIDEO %s: Trying streamrip get_video_file_url...", video_id)
url = await self._safe_api_call(
self.streamrip_client.get_video_file_url,
video_id=video_id_str,
retries=2,
)
if url:
logging.info("VIDEO %s: Got stream URL via streamrip", video_id)
return url if url else None
except Exception as e:
# Streamrip's get_video_file_url may fail if Tidal returns HLS manifest
# directly instead of a JSON with URLs. Try to get the manifest URL directly.
err_msg = str(e)
logging.info("VIDEO %s: streamrip method failed (%s), trying fallback...", video_id, err_msg[:100])
if "mpegurl" in err_msg.lower() or ".m3u8" in err_msg:
# Extract the master manifest URL from the error message
import re
m3u8_match = re.search(r"(https://[^\s'\"]+\.m3u8[^\s'\"]*)", err_msg)
if m3u8_match:
master_url = m3u8_match.group(1)
logging.info("VIDEO %s: Extracted HLS master URL from error", video_id)
# Try to get the highest quality variant from the master playlist
best_url = await self._get_best_variant_from_master(master_url)
return best_url or master_url
# Fall back to fetching the manifest URL directly from Tidal API
try:
logging.info("VIDEO %s: Trying direct API manifest fetch...", video_id)
result = await self._get_video_manifest_url(video_id_str)
if result:
logging.info("VIDEO %s: Got stream URL via direct API", video_id)
return result
except Exception as e2:
logging.warning("get_video_stream_url failed for %s: %s (fallback: %s)", video_id, e, e2)
return None
async def _get_best_variant_from_master(self, master_url: str) -> Optional[str]:
"""Parse HLS master playlist and return the highest quality variant URL."""
import re
try:
# Ensure we have a session
if not hasattr(self.streamrip_client, 'session') or not self.streamrip_client.session:
self.streamrip_client.session = await self.streamrip_client.get_session()
async with self.streamrip_client.session.get(master_url) as resp:
if resp.status != 200:
return None
playlist_text = await resp.text()
# Parse HLS master playlist for variant streams
stream_pattern = re.compile(
r'#EXT-X-STREAM-INF:.*?BANDWIDTH=(\d+).*?\n([^\n#]+)',
re.MULTILINE
)
matches = stream_pattern.findall(playlist_text)
if matches:
# Sort by bandwidth (highest quality = highest bandwidth)
matches.sort(key=lambda x: int(x[0]), reverse=True)
best_variant = matches[0][1].strip()
# If it's a relative URL, make it absolute
if not best_variant.startswith('http'):
base_url = master_url.rsplit('/', 1)[0]
best_variant = f"{base_url}/{best_variant}"
logging.info("Selected highest quality variant: bandwidth=%s", matches[0][0])
return best_variant
except Exception as e:
logging.warning("Failed to parse HLS master playlist: %s", e)
return None
async def _get_video_manifest_url(self, video_id: str) -> Optional[str]:
"""Directly fetch the HLS manifest URL from Tidal API.
This is a fallback when streamrip's method fails due to format changes.
Returns the highest quality variant URL from the HLS master playlist.
"""
import base64
import re
params = {
"videoquality": "HIGH",
"playbackmode": "STREAM",
"assetpresentation": "FULL",
}
# Ensure we have a session
if not hasattr(self.streamrip_client, 'session') or not self.streamrip_client.session:
self.streamrip_client.session = await self.streamrip_client.get_session()
# Make the API request
resp = await self.streamrip_client._api_request(
f"videos/{video_id}/playbackinfopostpaywall", params=params
)
if not resp or "manifest" not in resp:
return None
# Decode the manifest
manifest_data = json.loads(base64.b64decode(resp["manifest"]).decode("utf-8"))
# The manifest should contain URLs - get the master playlist URL
urls = manifest_data.get("urls", [])
if not urls:
return None
master_url = urls[0]
# Try to fetch the master playlist and find the highest quality variant
try:
async with self.streamrip_client.session.get(master_url) as resp:
if resp.status == 200:
playlist_text = await resp.text()
# Parse HLS master playlist for variant streams
# Look for lines like: #EXT-X-STREAM-INF:BANDWIDTH=...,RESOLUTION=1920x1080
# followed by the variant URL
stream_pattern = re.compile(
r'#EXT-X-STREAM-INF:.*?BANDWIDTH=(\d+).*?\n([^\n#]+)',
re.MULTILINE
)
matches = stream_pattern.findall(playlist_text)
if matches:
# Sort by bandwidth (highest quality = highest bandwidth)
matches.sort(key=lambda x: int(x[0]), reverse=True)
best_variant = matches[0][1].strip()
# If it's a relative URL, make it absolute
if not best_variant.startswith('http'):
base_url = master_url.rsplit('/', 1)[0]
best_variant = f"{base_url}/{best_variant}"
logging.info("Selected highest quality video variant: bandwidth=%s", matches[0][0])
return best_variant
except Exception as e:
logging.warning("Failed to parse HLS master playlist: %s", e)
# Fall back to returning the master URL (ffmpeg will pick a variant)
return master_url
async def download_video(self, video_id: int, output_path: Optional[str] = None) -> Optional[str]:
"""Download a video by ID.
Args:
video_id: The Tidal video ID.
output_path: Optional path to save the video. Can be a directory or full file path.
If not provided, a temp path is used.
Returns:
The path to the downloaded video file, or None on failure.
"""
try:
logging.info("VIDEO %s: Getting stream URL...", video_id)
video_url = await self.get_video_stream_url(video_id)
if not video_url:
logging.warning("No video URL for video ID: %s", video_id)
return None
logging.info("VIDEO %s: Got stream URL, preparing download...", video_id)
# Determine output path
if not output_path:
unique = hashlib.sha256(uuid4().bytes).hexdigest()[:16]
dl_folder_path = f"{self.streamrip_config.session.downloads.folder}/{unique}"
try:
os.makedirs(dl_folder_path, exist_ok=True)
except Exception:
pass
output_path = f"{dl_folder_path}/{video_id}.mp4"
elif os.path.isdir(output_path):
# If output_path is a directory, append the video filename
output_path = os.path.join(output_path, f"{video_id}.mp4")
# Video URLs are HLS manifests - use ffmpeg to download
logging.info("VIDEO %s: Starting ffmpeg HLS download to %s", video_id, output_path)
print(f"VIDEO {video_id}: Starting ffmpeg download...")
cmd = [
"ffmpeg",
"-nostdin", # Don't read from stdin - prevents SIGTTIN in background
"-hide_banner",
"-loglevel", "warning",
"-analyzeduration", "10M",
"-probesize", "10M",
"-i", video_url,
"-c:v", "copy",
"-c:a", "aac",
"-b:a", "256k",
"-af", "aresample=async=1:first_pts=0",
"-y",
output_path,
]
proc = await asyncio.create_subprocess_exec(
*cmd,
stdin=asyncio.subprocess.DEVNULL,
stdout=asyncio.subprocess.DEVNULL,
stderr=asyncio.subprocess.PIPE,
)
# Use communicate() to avoid buffer deadlocks
_, stderr = await proc.communicate()
if proc.returncode != 0:
stderr_text = stderr.decode().strip() if stderr else "Unknown error"
logging.error("ffmpeg video download failed for %s: %s", video_id, stderr_text)
return None
print(f"VIDEO {video_id}: ffmpeg completed, verifying file...")
if not os.path.exists(output_path) or os.path.getsize(output_path) == 0:
logging.error("Video download completed but file missing or empty")
return None
# Verify the MP4 is valid (has moov atom)
verify_cmd = [
"ffprobe",
"-v", "error",
"-show_entries", "format=duration",
"-of", "default=noprint_wrappers=1:nokey=1",
output_path,
]
verify_proc = await asyncio.create_subprocess_exec(
*verify_cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
verify_stdout, verify_stderr = await verify_proc.communicate()
if verify_proc.returncode != 0:
stderr_text = verify_stderr.decode().strip() if verify_stderr else ""
logging.error("Downloaded video is corrupt (moov atom missing?): %s", stderr_text)
# Clean up corrupt file
try:
os.remove(output_path)
except Exception:
pass
return None
duration = verify_stdout.decode().strip() if verify_stdout else "unknown"
logging.info("Video %s downloaded to %s (%d bytes, duration: %ss)",
video_id, output_path, os.path.getsize(output_path), duration)
return output_path
except Exception as e:
logging.critical("Video download error for %s: %s", video_id, e)
return None
return None # Should not reach here, but satisfy type checker
async def get_videos_by_artist_id(self, artist_id: int, limit: int = 50) -> Optional[list[dict]]:
"""Get videos by artist ID.
Args:
artist_id: The Tidal artist ID.
limit: Maximum number of videos to return.
Returns:
List of videos by the artist or None if not found.
"""
artist_id_str = str(artist_id)
# Ensure we have a session
if not hasattr(self.streamrip_client, 'session') or not self.streamrip_client.session:
self.streamrip_client.session = await self.streamrip_client.get_session()
try:
# Use the direct Tidal API endpoint for artist videos
resp = await self._safe_api_call(
self.streamrip_client._api_request,
f"artists/{artist_id_str}/videos",
params={"limit": limit, "offset": 0},
retries=3,
)
except Exception as e:
logging.warning("get_videos_by_artist_id API call failed: %s", e)
return None
if not resp:
return None
# The response has an "items" array
videos = resp.get("items", [])
if not videos:
return None
videos_out = []
for video in videos:
artist_info = video.get("artist") or (video.get("artists", [{}])[0] if video.get("artists") else {})
artist_name = artist_info.get("name", "Unknown Artist") if isinstance(artist_info, dict) else str(artist_info)
videos_out.append({
"id": video.get("id"),
"title": video.get("title"),
"artist": artist_name,
"duration": video.get("duration"),
"duration_formatted": self.format_duration(video.get("duration")),
"release_date": video.get("releaseDate"),
"image_id": video.get("imageId"),
"image_url": (
f"https://resources.tidal.com/images/{video.get('imageId').replace('-', '/')}/640x360.jpg"
if video.get("imageId")
else None
),
})
return videos_out
return videos_out
async def get_lrc_by_track_id(self, track_id: int) -> Optional[str]:
"""Get LRC lyrics by track ID."""
logging.debug(f"SR: Fetching metadata for track ID {track_id}")