diff --git a/endpoints/lighting.py b/endpoints/lighting.py index 69b0ea5..a38fe22 100644 --- a/endpoints/lighting.py +++ b/endpoints/lighting.py @@ -18,6 +18,7 @@ import json import os import time import asyncio +import ssl from typing import Optional, Any from dataclasses import dataclass, field from enum import Enum @@ -82,10 +83,14 @@ class Lighting: # Configuration TOKEN_EXPIRY_BUFFER = 300 # Consider token expired 5 min before actual expiry CONNECTION_READY_TIMEOUT = 15 # Max seconds to wait for TCP connection to be ready + COMMAND_TIMEOUT = 10 # Max seconds for a single device command + COMMAND_LOCK_TIMEOUT = 30 # Max seconds to wait for command lock COMMAND_DELAY = 0.3 # Delay between sequential commands MAX_RETRIES = 3 MAX_CONSECUTIVE_FAILURES = 5 # Force full reconnect after this many failures HEALTH_CHECK_INTERVAL = 30 # Check connection health every 30s + MAX_CONNECTION_AGE = 1800 # Force reconnect after 30 minutes + MAX_IDLE_SECONDS = 300 # Reconnect if idle for 5 minutes TWO_FA_POLL_INTERVAL = 5 # Poll for 2FA code every 5 seconds TWO_FA_TIMEOUT = 300 # 5 minutes to enter 2FA code REDIS_2FA_KEY = "cync:2fa_code" @@ -113,6 +118,7 @@ class Lighting: # Connection state self._state = CyncConnectionState() self._connection_lock = asyncio.Lock() + self._command_lock = asyncio.Lock() self._health_task: Optional[asyncio.Task] = None self._2fa_task: Optional[asyncio.Task] = None @@ -323,6 +329,10 @@ class Lighting: logger.info("Cync TCP manager not connected; will reconnect") return False + if self._is_connection_stale(): + logger.info("Cync connection is stale; will reconnect") + return False + # Check token expiry if self._is_token_expired(): logger.info("Token expired or expiring soon") @@ -330,6 +340,18 @@ class Lighting: return True + def _is_connection_stale(self) -> bool: + """Reconnect if the connection is too old or has been idle too long.""" + now = time.time() + if self._state.connected_at and now - self._state.connected_at > self.MAX_CONNECTION_AGE: + return True + + last_activity = self._state.last_successful_command or self._state.connected_at + if last_activity and now - last_activity > self.MAX_IDLE_SECONDS: + return True + + return False + def _is_tcp_connected(self) -> bool: """Best-effort check that the pycync TCP connection is alive.""" client = getattr(self._state.cync_api, "_command_client", None) @@ -359,6 +381,24 @@ class Lighting: return True + def _is_ssl_closed_error(self, error: Exception) -> bool: + """Detect SSL/TCP connection closed errors.""" + if isinstance( + error, + ( + ssl.SSLError, + aiohttp.ClientConnectionError, + ConnectionResetError, + BrokenPipeError, + ConnectionError, + asyncio.IncompleteReadError, + ), + ): + return True + + message = str(error).lower() + return "ssl connection is closed" in message or "connection reset" in message + def _is_token_expired(self) -> bool: """Check if token is expired or will expire soon.""" if not self._state.user: @@ -433,9 +473,12 @@ class Lighting: ) try: - self._state.user = await self._state.auth.login() - self._save_cached_token(self._state.user) - logger.info("Cync login successful") + if self._state.auth: + self._state.user = await self._state.auth.login() + self._save_cached_token(self._state.user) + logger.info("Cync login successful") + else: + raise TwoFactorRequiredError("Unknown 2FA error") except TwoFactorRequiredError: await self._handle_2fa() except AuthFailedError as e: @@ -644,6 +687,11 @@ class Lighting: needs_reconnect = True reason = "TCP connection lost" + # Reconnect if connection is stale + elif self._is_connection_stale(): + needs_reconnect = True + reason = "connection stale" + if needs_reconnect: logger.warning(f"Health monitor triggering reconnection: {reason}") self._state.status = ConnectionStatus.CONNECTING @@ -707,26 +755,38 @@ class Lighting: device = await self._get_device() logger.info(f"Sending commands to device: {device.name}") - # Power - if power == "on": - await device.turn_on() - logger.debug("Sent turn_on") - else: - await device.turn_off() - logger.debug("Sent turn_off") + # Power - with timeout to prevent hangs + try: + if power == "on": + await asyncio.wait_for(device.turn_on(), timeout=self.COMMAND_TIMEOUT) + logger.debug("Sent turn_on") + else: + await asyncio.wait_for(device.turn_off(), timeout=self.COMMAND_TIMEOUT) + logger.debug("Sent turn_off") + except asyncio.TimeoutError: + raise TimeoutError(f"Power command timed out after {self.COMMAND_TIMEOUT}s") await asyncio.sleep(self.COMMAND_DELAY) - # Brightness + # Brightness - with timeout if brightness is not None: - await device.set_brightness(brightness) - logger.debug(f"Sent brightness: {brightness}") + try: + await asyncio.wait_for(device.set_brightness(brightness), timeout=self.COMMAND_TIMEOUT) + logger.debug(f"Sent brightness: {brightness}") + except asyncio.TimeoutError: + raise TimeoutError(f"Brightness command timed out after {self.COMMAND_TIMEOUT}s") await asyncio.sleep(self.COMMAND_DELAY) - # Color + # Color - with timeout if rgb: - await device.set_rgb(rgb) - logger.debug(f"Sent RGB: {rgb}") - await asyncio.sleep(self.COMMAND_DELAY) + try: + await asyncio.wait_for(device.set_rgb(rgb), timeout=self.COMMAND_TIMEOUT) + logger.debug(f"Sent RGB: {rgb}") + except asyncio.TimeoutError: + raise TimeoutError(f"RGB command timed out after {self.COMMAND_TIMEOUT}s") + + # Verify connection is still alive after sending + if not self._is_tcp_connected(): + raise ConnectionError("Cync TCP connection lost after command") # Track success now = time.time() @@ -851,46 +911,65 @@ class Lighting: """Apply state to device with connection retry logic.""" last_error: Optional[Exception] = None - for attempt in range(self.MAX_RETRIES): - try: - # Ensure connection (force reconnect on retries) - await self._connect(force=(attempt > 0)) + # Try to acquire command lock with timeout to prevent indefinite waits + try: + await asyncio.wait_for( + self._command_lock.acquire(), timeout=self.COMMAND_LOCK_TIMEOUT + ) + except asyncio.TimeoutError: + logger.error(f"Command lock acquisition timed out after {self.COMMAND_LOCK_TIMEOUT}s") + raise TimeoutError("Another command is in progress and not responding") - # Send commands - await self._send_commands(power, brightness, rgb) - return # Success + try: + for attempt in range(self.MAX_RETRIES): + try: + # Ensure connection (force reconnect on retries) + await self._connect(force=(attempt > 0)) - except (AuthFailedError, TwoFactorRequiredError) as e: - last_error = e - self._state.consecutive_failures += 1 - self._state.last_error = str(e) - logger.warning(f"Auth error on attempt {attempt + 1}: {e}") - self._clear_cached_token() + # Send commands + await self._send_commands(power, brightness, rgb) + return # Success - except TimeoutError as e: - last_error = e - self._state.consecutive_failures += 1 - self._state.last_error = str(e) - logger.warning(f"Timeout on attempt {attempt + 1}: {e}") + except (AuthFailedError, TwoFactorRequiredError) as e: + last_error = e + self._state.consecutive_failures += 1 + self._state.last_error = str(e) + logger.warning(f"Auth error on attempt {attempt + 1}: {e}") + self._clear_cached_token() - except Exception as e: - last_error = e - self._state.consecutive_failures += 1 - self._state.last_error = str(e) - logger.warning( - f"Error on attempt {attempt + 1}: {type(e).__name__}: {e}" - ) + except TimeoutError as e: + last_error = e + self._state.consecutive_failures += 1 + self._state.last_error = str(e) + logger.warning(f"Timeout on attempt {attempt + 1}: {e}") - # Wait before retry (exponential backoff) - if attempt < self.MAX_RETRIES - 1: - wait_time = 2**attempt - logger.info(f"Retrying in {wait_time}s...") - await asyncio.sleep(wait_time) + except Exception as e: + last_error = e + self._state.consecutive_failures += 1 + self._state.last_error = str(e) + if self._is_ssl_closed_error(e) or not self._is_tcp_connected(): + logger.warning( + "Connection lost during command; will reconnect and retry" + ) + self._state.status = ConnectionStatus.CONNECTING + self._update_status_in_redis() + else: + logger.warning( + f"Error on attempt {attempt + 1}: {type(e).__name__}: {e}" + ) - # All retries failed - self._update_status_in_redis() - logger.error(f"All {self.MAX_RETRIES} attempts failed") - raise last_error or RuntimeError("Failed to apply lighting state") + # Wait before retry (exponential backoff) + if attempt < self.MAX_RETRIES - 1: + wait_time = 2**attempt + logger.info(f"Retrying in {wait_time}s...") + await asyncio.sleep(wait_time) + + # All retries failed + self._update_status_in_redis() + logger.error(f"All {self.MAX_RETRIES} attempts failed") + raise last_error or RuntimeError("Failed to apply lighting state") + finally: + self._command_lock.release() # ========================================================================= # Connection Status & 2FA Endpoints diff --git a/endpoints/rip.py b/endpoints/rip.py index 9c142d3..4a293f8 100644 --- a/endpoints/rip.py +++ b/endpoints/rip.py @@ -16,7 +16,7 @@ from rq.registry import ( FailedJobRegistry, ScheduledJobRegistry, ) -from utils.rip_background import bulk_download +from utils.rip_background import bulk_download, bulk_video_download from lyric_search.sources import private from typing import Literal from pydantic import BaseModel @@ -30,6 +30,11 @@ class ValidBulkFetchRequest(BaseModel): quality: Literal["FLAC", "Lossy"] = "FLAC" +class ValidVideoBulkFetchRequest(BaseModel): + video_ids: list[int] + target: str + + class RIP(FastAPI): """ Ripping Endpoints @@ -65,6 +70,13 @@ class RIP(FastAPI): "trip/jobs/list": self.job_list_handler, "trip/auth/start": self.tidal_auth_start_handler, "trip/auth/check": self.tidal_auth_check_handler, + # Video endpoints - order matters: specific routes before parameterized ones + "trip/videos/search": self.video_search_handler, + "trip/videos/artist/{artist_id:path}": self.videos_by_artist_handler, + "trip/videos/bulk_fetch": self.video_bulk_fetch_handler, + "trip/video/{video_id:path}/stream": self.video_stream_handler, + "trip/video/{video_id:path}/download": self.video_download_handler, + "trip/video/{video_id:path}": self.video_metadata_handler, } # Store pending device codes for auth flow @@ -77,10 +89,10 @@ class RIP(FastAPI): handler, methods=( ["GET"] - if endpoint not in ("trip/bulk_fetch", "trip/auth/check") + if endpoint not in ("trip/bulk_fetch", "trip/auth/check", "trip/videos/bulk_fetch") else ["POST"] ), - include_in_schema=False, + include_in_schema=True, dependencies=dependencies, ) @@ -530,3 +542,250 @@ class RIP(FastAPI): content={"error": str(e)}, status_code=500, ) + + # ========================================================================= + # Video Endpoints + # ========================================================================= + + async def video_search_handler( + self, q: str, request: Request, limit: int = 50, user=Depends(get_current_user) + ) -> Response: + """ + Search for videos by query string. + + Parameters: + - **q** (str): Search query (artist, song title, etc.) + - **limit** (int): Maximum number of results (default 50). + - **request** (Request): The request object. + - **user**: Current user (dependency). + + Returns: + - **Response**: JSON response with video results or 404. + """ + if "trip" not in user.get("roles", []) and "admin" not in user.get("roles", []): + raise HTTPException(status_code=403, detail="Insufficient permissions") + + if not q or not q.strip(): + return JSONResponse( + content={"error": "Query parameter 'q' is required"}, + status_code=400, + ) + + videos = await self.trip_util.search_videos(q.strip(), limit=limit) + if not videos: + return JSONResponse( + content={"error": "No videos found"}, + status_code=404, + ) + + return JSONResponse(content={"videos": videos}) + + async def video_metadata_handler( + self, video_id: str, request: Request, user=Depends(get_current_user) + ) -> Response: + """ + Get metadata for a specific video. + + Parameters: + - **video_id** (str): The Tidal video ID. + - **request** (Request): The request object. + - **user**: Current user (dependency). + + Returns: + - **Response**: JSON response with video metadata or 404. + """ + if "trip" not in user.get("roles", []) and "admin" not in user.get("roles", []): + raise HTTPException(status_code=403, detail="Insufficient permissions") + + try: + vid_id = int(video_id) + except ValueError: + return JSONResponse( + content={"error": "Invalid video ID"}, + status_code=400, + ) + + metadata = await self.trip_util.get_video_metadata(vid_id) + if not metadata: + return JSONResponse( + content={"error": "Video not found"}, + status_code=404, + ) + + return JSONResponse(content=metadata) + + async def video_stream_handler( + self, video_id: str, request: Request, user=Depends(get_current_user) + ) -> Response: + """ + Get the stream URL for a video. + + Parameters: + - **video_id** (str): The Tidal video ID. + - **request** (Request): The request object. + - **user**: Current user (dependency). + + Returns: + - **Response**: JSON response with stream URL or 404. + """ + if "trip" not in user.get("roles", []) and "admin" not in user.get("roles", []): + raise HTTPException(status_code=403, detail="Insufficient permissions") + + try: + vid_id = int(video_id) + except ValueError: + return JSONResponse( + content={"error": "Invalid video ID"}, + status_code=400, + ) + + stream_url = await self.trip_util.get_video_stream_url(vid_id) + if not stream_url: + return JSONResponse( + content={"error": "Video stream not available"}, + status_code=404, + ) + + return JSONResponse(content={"stream_url": stream_url}) + + async def videos_by_artist_handler( + self, artist_id: str, request: Request, user=Depends(get_current_user) + ) -> Response: + """ + Get videos by artist ID. + + Parameters: + - **artist_id** (str): The Tidal artist ID. + - **request** (Request): The request object. + - **user**: Current user (dependency). + + Returns: + - **Response**: JSON response with artist's videos or 404. + """ + if "trip" not in user.get("roles", []) and "admin" not in user.get("roles", []): + raise HTTPException(status_code=403, detail="Insufficient permissions") + + try: + art_id = int(artist_id) + except ValueError: + return JSONResponse( + content={"error": "Invalid artist ID"}, + status_code=400, + ) + + videos = await self.trip_util.get_videos_by_artist_id(art_id) + if not videos: + return JSONResponse( + content={"error": "No videos found for this artist"}, + status_code=404, + ) + + return JSONResponse(content={"videos": videos}) + + async def video_download_handler( + self, video_id: str, request: Request, user=Depends(get_current_user) + ) -> Response: + """ + Download a video file. + + Parameters: + - **video_id** (str): The Tidal video ID. + - **request** (Request): The request object. + - **user**: Current user (dependency). + + Returns: + - **Response**: The video file as a streaming response. + """ + from fastapi.responses import FileResponse + import os + + if "trip" not in user.get("roles", []) and "admin" not in user.get("roles", []): + raise HTTPException(status_code=403, detail="Insufficient permissions") + + try: + vid_id = int(video_id) + except ValueError: + return JSONResponse( + content={"error": "Invalid video ID"}, + status_code=400, + ) + + # Get video metadata for filename + metadata = await self.trip_util.get_video_metadata(vid_id) + if not metadata: + return JSONResponse( + content={"error": "Video not found"}, + status_code=404, + ) + + # Download the video + file_path = await self.trip_util.download_video(vid_id) + if not file_path or not os.path.exists(file_path): + return JSONResponse( + content={"error": "Failed to download video"}, + status_code=500, + ) + + # Generate a nice filename + artist = metadata.get("artist", "Unknown") + title = metadata.get("title", f"video_{vid_id}") + # Sanitize filename + safe_filename = f"{artist} - {title}.mp4".replace("/", "-").replace("\\", "-") + + return FileResponse( + path=file_path, + filename=safe_filename, + media_type="video/mp4", + ) + + async def video_bulk_fetch_handler( + self, + data: ValidVideoBulkFetchRequest, + request: Request, + user=Depends(get_current_user), + ) -> Response: + """ + Bulk fetch a list of video IDs. + + Parameters: + - **data** (ValidVideoBulkFetchRequest): Bulk video fetch request data. + - **request** (Request): The request object. + - **user**: Current user (dependency). + + Returns: + - **Response**: JSON response with job info or error. + """ + if "trip" not in user.get("roles", []) and "admin" not in user.get("roles", []): + raise HTTPException(status_code=403, detail="Insufficient permissions") + if not data or not data.video_ids or not data.target: + return JSONResponse( + content={ + "err": True, + "errorText": "Invalid data", + } + ) + video_ids = data.video_ids + target = data.target + job = self.task_queue.enqueue( + bulk_video_download, + args=(video_ids,), + job_timeout=28800, # 8 hours for videos + failure_ttl=86400, + result_ttl=-1, + meta={ + "progress": 0, + "status": "Queued", + "target": target, + "videos_in": len(video_ids), + "type": "video", + }, + ) + self.redis_conn.lpush("enqueued_job_ids", job.id) + return JSONResponse( + content={ + "job_id": job.id, + "status": "Queued", + "videos": len(video_ids), + "target": target, + } + ) \ No newline at end of file diff --git a/utils/rip_background.py b/utils/rip_background.py index 79d8a96..bfd6127 100644 --- a/utils/rip_background.py +++ b/utils/rip_background.py @@ -320,6 +320,15 @@ def bulk_download(track_list: list, quality: str = "FLAC"): } attempt = 0 + def _track_desc() -> str: + """Format track info for log messages.""" + title = track_info.get("title") or f"Track {track_id}" + artist = track_info.get("artist") or "Unknown" + album = track_info.get("album") or "" + if album: + return f"{track_id} - '{title}' by {artist} [{album}]" + return f"{track_id} - '{title}' by {artist}" + # Fetch metadata FIRST to check if track is available before attempting download md = None try: @@ -327,6 +336,12 @@ def bulk_download(track_list: list, quality: str = "FLAC"): md = await sr.get_metadata_by_track_id(track_id) or {} print(f"DEBUG: Metadata fetched: {bool(md)}") + # Populate track_info immediately so failure logs have useful info + if md: + track_info["title"] = md.get("title") or f"Track {track_id}" + track_info["artist"] = md.get("artist") or "Unknown Artist" + track_info["album"] = md.get("album") or "Unknown Album" + # Check if track is streamable if md and not md.get("streamable", True): print(f"TRACK {track_id}: Not streamable, skipping") @@ -564,7 +579,7 @@ def bulk_download(track_list: list, quality: str = "FLAC"): break except aiohttp.ClientResponseError as e: - msg = f"Track {track_id} attempt {attempt} ClientResponseError: {e}" + msg = f"Track {_track_desc()} attempt {attempt} ClientResponseError: {e}" send_log_to_discord(msg, "WARNING", target) # If 429, backoff as before. If 5xx, recreate session and refresh Tidal client. if getattr(e, "status", None) == 429: @@ -581,7 +596,7 @@ def bulk_download(track_list: list, quality: str = "FLAC"): try: await sr._force_fresh_login() send_log_to_discord( - f"Refreshed Tidal session after 5xx error on track {track_id}", + f"Refreshed Tidal session after 5xx error on track {_track_desc()}", "WARNING", target, ) @@ -625,7 +640,7 @@ def bulk_download(track_list: list, quality: str = "FLAC"): if is_not_found: # Permanent failure - do not retry msg = ( - f"Track {track_id} not found/unavailable, skipping: {e}" + f"Track {_track_desc()} not found/unavailable, skipping: {e}" ) print(msg) send_log_to_discord(msg, "WARNING", target) @@ -634,7 +649,7 @@ def bulk_download(track_list: list, quality: str = "FLAC"): break # Exit retry loop immediately elif is_5xx_error: msg = ( - f"Track {track_id} attempt {attempt} server error: {e}" + f"Track {_track_desc()} attempt {attempt} server error: {e}" ) send_log_to_discord(msg, "WARNING", target) track_info["error"] = err_str @@ -648,7 +663,7 @@ def bulk_download(track_list: list, quality: str = "FLAC"): try: await sr._force_fresh_login() send_log_to_discord( - f"Refreshed Tidal session after 5xx error on track {track_id}", + f"Refreshed Tidal session after 5xx error on track {_track_desc()}", "WARNING", target, ) @@ -661,7 +676,7 @@ def bulk_download(track_list: list, quality: str = "FLAC"): if attempt >= MAX_RETRIES: track_info["status"] = "Failed" send_log_to_discord( - f"Track {track_id} failed after {attempt} attempts (5xx)", + f"Track {_track_desc()} failed after {attempt} attempts (5xx)", "ERROR", target, ) @@ -670,13 +685,13 @@ def bulk_download(track_list: list, quality: str = "FLAC"): ) elif is_no_stream_url: if attempt == 1 or attempt == MAX_RETRIES: - msg = f"Track {track_id} attempt {attempt} failed: {e}\n{tb}" + msg = f"Track {_track_desc()} attempt {attempt} failed: {e}\n{tb}" send_log_to_discord(msg, "ERROR", target) track_info["error"] = str(e) if attempt >= MAX_RETRIES: track_info["status"] = "Failed" send_log_to_discord( - f"Track {track_id} failed after {attempt} attempts", + f"Track {_track_desc()} failed after {attempt} attempts", "ERROR", target, ) @@ -685,14 +700,14 @@ def bulk_download(track_list: list, quality: str = "FLAC"): ) else: msg = ( - f"Track {track_id} attempt {attempt} failed: {e}\n{tb}" + f"Track {_track_desc()} attempt {attempt} failed: {e}\n{tb}" ) send_log_to_discord(msg, "ERROR", target) track_info["error"] = str(e) if attempt >= MAX_RETRIES: track_info["status"] = "Failed" send_log_to_discord( - f"Track {track_id} failed after {attempt} attempts", + f"Track {_track_desc()} failed after {attempt} attempts", "ERROR", target, ) @@ -885,11 +900,367 @@ def bulk_download(track_list: list, quality: str = "FLAC"): loop.close() -# Correct integration of FLAC stream check -async def process_tracks(track_list): - for i, track_id in enumerate(track_list or []): - combined_path = f"/tmp/{uuid.uuid4().hex}_combined.m4s" # Example path - if not await check_flac_stream(combined_path): - logger.error(f"No FLAC stream found in {combined_path}. Skipping file.") - continue - # Proceed with decoding pipeline +# ---------- bulk_video_download ---------- +def bulk_video_download(video_list: list): + """ + RQ job for bulk video downloads: + - fetches video metadata and HLS streams + - downloads with ffmpeg in highest quality + - creates ONE tarball for all videos + - returns [tarball_path] + - sends relevant messages to Discord + """ + job = get_current_job() + job_id = job.id if job else uuid.uuid4().hex + target = job.meta.get("target") if job else None + staging_root = ROOT_DIR / f"video_{job_id}" + + if job: + try: + job.meta["video_ids"] = [str(v) for v in (video_list or [])] + job.meta["videos"] = [] + job.meta["progress"] = 0 + job.meta["tarball"] = None + job.meta["status"] = "Started" + job.save_meta() + except Exception as e: + send_log_to_discord(f"Failed to init job.meta: {e}", "WARNING", target) + + # Job started Discord message + asyncio.run( + discord_notify( + DISCORD_WEBHOOK, + title=f"Video Job Started: {job_id}", + description=f"Processing `{len(video_list)}` video(s)", + target=target, + color=0x00FFFF, + ) + ) + + async def process_videos(video_list): + per_video_meta = [] + all_final_files = [] + all_artists = set() + (ROOT_DIR / "completed").mkdir(parents=True, exist_ok=True) + + total = len(video_list or []) + for i, video_id in enumerate(video_list or []): + print(f"DEBUG: Processing video {i + 1}/{total}: {video_id}") + video_info = { + "video_id": str(video_id), + "title": None, + "artist": None, + "status": "Pending", + "file_path": None, + "filename": None, + "error": None, + "attempts": 0, + } + attempt = 0 + + def _video_desc() -> str: + """Format video info for log messages.""" + title = video_info.get("title") or f"Video {video_id}" + artist = video_info.get("artist") or "Unknown" + return f"{video_id} - '{title}' by {artist}" + + # Fetch metadata first + md = None + try: + print(f"DEBUG: Fetching metadata for video {video_id}") + md = await sr.get_video_metadata(video_id) + print(f"DEBUG: Metadata fetched: {bool(md)}") + + if md: + video_info["title"] = md.get("title") or f"Video {video_id}" + video_info["artist"] = md.get("artist") or "Unknown Artist" + + except Exception as meta_err: + print(f"VIDEO {video_id}: Metadata fetch failed: {meta_err}") + md = None + + while attempt < MAX_RETRIES: + attempt += 1 + video_info["attempts"] = attempt + + try: + # Use sr.download_video which handles HLS and quality selection + print(f"VIDEO {video_id}: Starting download (attempt {attempt})") + + # Download to temporary location + tmp_dir = Path(f"/tmp/video_{uuid.uuid4().hex}") + tmp_dir.mkdir(parents=True, exist_ok=True) + + # Add timeout for video download (30 minutes max per video) + try: + file_path = await asyncio.wait_for( + sr.download_video(video_id, str(tmp_dir)), + timeout=1800 # 30 minutes + ) + except asyncio.TimeoutError: + print(f"VIDEO {video_id}: Download timed out after 30 minutes") + raise RuntimeError("Download timed out after 30 minutes") + + if not file_path or not Path(file_path).exists(): + raise RuntimeError("Download completed but no file created") + + # If we didn't get metadata earlier, try again + if not md: + try: + md = await sr.get_video_metadata(video_id) + except Exception: + md = {} + + md = md or {} + artist_raw = md.get("artist") or "Unknown Artist" + title_raw = md.get("title") or f"Video {video_id}" + + artist = sanitize_filename(artist_raw) + title = sanitize_filename(title_raw) + + video_info["title"] = title + video_info["artist"] = artist + + print(f"VIDEO {video_id}: Processing '{title}' by {artist}") + + all_artists.add(artist) + video_dir = staging_root / artist + video_dir.mkdir(parents=True, exist_ok=True) + final_file = ensure_unique_path(video_dir / f"{title}.mp4") + + # Move to final location + print(f"VIDEO {video_id}: Moving to final location...") + shutil.move(str(file_path), str(final_file)) + + # Clean up temp dir + try: + shutil.rmtree(tmp_dir, ignore_errors=True) + except Exception: + pass + + print(f"VIDEO {video_id}: File moved successfully") + + # Success + video_info["status"] = "Success" + video_info["file_path"] = str(final_file) + try: + video_info["filename"] = final_file.name + except Exception: + video_info["filename"] = None + video_info["error"] = None + all_final_files.append(final_file) + + print( + f"VIDEO {video_id}: SUCCESS! Progress: {((i + 1) / total) * 100:.0f}%" + ) + + # Throttle + await asyncio.sleep(random.uniform(THROTTLE_MIN, THROTTLE_MAX)) + + if job: + job.meta["progress"] = int(((i + 1) / total) * 100) + job.meta["videos"] = per_video_meta + [video_info] + job.save_meta() + break + + except Exception as e: + tb = traceback.format_exc() + err_str = str(e).lower() + + is_not_found = any( + phrase in err_str + for phrase in ( + "video not found", + "not found", + "404", + "does not exist", + "no longer available", + ) + ) + + if is_not_found: + msg = f"Video {_video_desc()} not found/unavailable, skipping: {e}" + print(msg) + send_log_to_discord(msg, "WARNING", target) + video_info["status"] = "Failed" + video_info["error"] = str(e) + break + else: + msg = f"Video {_video_desc()} attempt {attempt} failed: {e}\n{tb}" + send_log_to_discord(msg, "ERROR", target) + video_info["error"] = str(e) + if attempt >= MAX_RETRIES: + video_info["status"] = "Failed" + send_log_to_discord( + f"Video {_video_desc()} failed after {attempt} attempts", + "ERROR", + target, + ) + await asyncio.sleep(random.uniform(THROTTLE_MIN, THROTTLE_MAX)) + + # Ensure placeholders for job metadata + video_info["title"] = video_info.get("title") or f"Video {video_id}" + video_info["artist"] = video_info.get("artist") or "Unknown Artist" + if video_info.get("file_path") and not video_info.get("filename"): + try: + video_info["filename"] = Path(video_info["file_path"]).name + except Exception: + video_info["filename"] = None + per_video_meta.append(video_info) + + if not all_final_files: + if job: + job.meta["tarball"] = None + job.meta["status"] = "Failed" + job.save_meta() + send_log_to_discord( + f"No videos were successfully downloaded for job `{job_id}`", + "CRITICAL", + target, + ) + return [] + + # Tarball creation + artist_counts = {} + for v in per_video_meta: + if v["status"] == "Success" and v.get("file_path"): + try: + artist = Path(v["file_path"]).relative_to(staging_root).parts[0] + except Exception: + artist = "Unknown Artist" + artist_counts[artist] = artist_counts.get(artist, 0) + 1 + top_artist = ( + sorted(artist_counts.items(), key=lambda kv: (-kv[1], kv[0]))[0][0] + if artist_counts + else "Unknown Artist" + ) + + target_name = None + try: + if job and job.meta: + target_name = job.meta.get("target") + except Exception: + target_name = None + + base_label = ( + sanitize_filename(target_name) + if target_name + else sanitize_filename(top_artist) + ) + staged_tarball = staging_root / f"{base_label}_videos.tar.gz" + + counter = 1 + base_name = staged_tarball.stem + while staged_tarball.exists(): + counter += 1 + staged_tarball = staging_root / f"{base_name} ({counter}).tar.gz" + + final_dir = Path("/storage/music/TRIP/videos") + final_dir.mkdir(parents=True, exist_ok=True) + final_tarball = ensure_unique_filename_in_dir(final_dir, staged_tarball.name) + + if job: + job.meta["status"] = "Compressing" + job.save_meta() + + logging.info("Creating video tarball: %s", staged_tarball) + await discord_notify( + DISCORD_WEBHOOK, + title=f"Compressing: Video Job {job_id}", + description=f"Creating tarball: `{len(all_final_files)}` video(s).\nStaging path: {staged_tarball}", + color=0xFFA500, + target=target, + ) + try: + subprocess.run( + [ + "tar", + "-I", + "pigz -9", + "-cf", + str(staged_tarball), + "-C", + str(staging_root), + ] + + [str(f.relative_to(staging_root)) for f in all_final_files], + check=True, + ) + for f in all_final_files: + try: + os.remove(f) + except Exception: + pass + except FileNotFoundError: + send_log_to_discord( + "pigz not available, falling back to tarfile (slower).", + "WARNING", + target, + ) + with tarfile.open(staged_tarball, "w:gz") as tar: + for f in all_final_files: + try: + arcname = f.relative_to(staging_root) + except ValueError: + arcname = f.name + tar.add(f, arcname=str(arcname)) + try: + os.remove(f) + except Exception: + pass + except Exception as e: + send_log_to_discord(f"Video tar creation failed: {e}", "ERROR", target) + if job: + job.meta["status"] = "compress_failed" + job.save_meta() + return [] + + if not staged_tarball.exists(): + send_log_to_discord( + f"Video tarball was not created: `{staged_tarball}`", "CRITICAL", target + ) + if job: + job.meta["status"] = "compress_failed" + job.save_meta() + return [] + + try: + staged_tarball.rename(final_tarball) + except Exception: + shutil.move(str(staged_tarball), str(final_tarball)) + + await asyncio.to_thread(shutil.rmtree, staging_root, ignore_errors=True) + + if job: + job.meta["tarball"] = str(final_tarball) + job.meta["progress"] = 100 + job.meta["status"] = "Completed" + job.save_meta() + + # Job completed Discord message + completed = len(all_final_files) + failed = len(video_list) - completed + await discord_notify( + DISCORD_WEBHOOK, + title=f"Video Job Completed: {job_id}", + description=f"Processed `{len(video_list)}` video(s).\nCompleted: `{completed}`\nFailed: `{failed}`\nTarball: `{final_tarball}`", + target=target, + color=0x00FF00, + ) + + logging.info("Video job %s finished, tarball: %s", job_id, final_tarball) + + return [str(final_tarball)] + + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + try: + return loop.run_until_complete(process_videos(video_list)) + except Exception as e: + send_log_to_discord( + f"bulk_video_download failed: {e}\n{traceback.format_exc()}", "CRITICAL", target + ) + if job: + job.meta["status"] = "Failed" + job.save_meta() + finally: + loop.close() diff --git a/utils/sr_wrapper.py b/utils/sr_wrapper.py index 07c0327..3fc23e5 100644 --- a/utils/sr_wrapper.py +++ b/utils/sr_wrapper.py @@ -12,12 +12,19 @@ import json import os import aiohttp import time +import base64 # Monkey-patch streamrip's Tidal client credentials BEFORE importing TidalClient import streamrip.client.tidal as _tidal_module # type: ignore # noqa: E402 -_tidal_module.CLIENT_ID = "fX2JxdmntZWK0ixT" -_tidal_module.CLIENT_SECRET = "1Nn9AfDAjxrgJFJbKNWLeAyKGVGmINuXPPLHVXAvxAg=" +CLIENT_ID = base64.b64decode("ZlgySnhkbW50WldLMGl4VA==").decode("iso-8859-1") +CLIENT_SECRET = base64.b64decode( + "MU5tNUFmREFqeHJnSkZKYktOV0xlQXlLR1ZHbUlOdVhQUExIVlhBdnhBZz0=", +).decode("iso-8859-1") + +_tidal_module.CLIENT_ID = CLIENT_ID + +_tidal_module.CLIENT_SECRET = CLIENT_SECRET _tidal_module.AUTH = aiohttp.BasicAuth( login=_tidal_module.CLIENT_ID, password=_tidal_module.CLIENT_SECRET ) @@ -306,14 +313,21 @@ class SRUtil: if not token_expiry: return True # No expiry info means we should refresh try: - # token_expiry is typically an ISO timestamp string - if isinstance(token_expiry, str): - from datetime import datetime - - expiry_dt = datetime.fromisoformat(token_expiry.replace("Z", "+00:00")) - expiry_ts = expiry_dt.timestamp() - else: + # token_expiry can be a Unix timestamp (float/int/string) or ISO string + if not isinstance(token_expiry, str): expiry_ts = float(token_expiry) + else: + # Try parsing as a numeric Unix timestamp first + try: + expiry_ts = float(token_expiry) + except ValueError: + # Fall back to ISO format string + from datetime import datetime + + expiry_dt = datetime.fromisoformat( + token_expiry.replace("Z", "+00:00") + ) + expiry_ts = expiry_dt.timestamp() return expiry_ts < (time.time() + TIDAL_TOKEN_REFRESH_BUFFER) except Exception as e: logging.warning("Failed to parse token expiry '%s': %s", token_expiry, e) @@ -1167,6 +1181,459 @@ class SRUtil: logging.critical("Error: %s", str(e)) return False + # ========================================================================= + # Video Support + # ========================================================================= + + async def search_videos(self, query: str, limit: int = 50) -> Optional[list[dict]]: + """Search for videos by query string. + + Args: + query: Search query (artist name, song title, etc.) + limit: Maximum number of results to return. + + Returns: + List of video results with id, title, artist, duration, etc. + """ + max_retries = 4 + delay = 1.0 + for attempt in range(max_retries): + try: + results = await self._safe_api_call( + self.streamrip_client.search, + media_type="video", + query=query, + limit=limit, + retries=3, + ) + break + except Exception as e: + msg = str(e) + if ("400" in msg or "429" in msg) and attempt < max_retries - 1: + await asyncio.sleep(delay) + delay *= 2 + continue + else: + logging.warning("Video search failed: %s", e) + return None + else: + return None + + if not results: + return None + + # Results can be paged - get items from first page + if isinstance(results, list): + results_page = results[0] if results else {} + else: + results_page = results + + items = results_page.get("items", []) if isinstance(results_page, dict) else [] + if not items: + return None + + videos_out = [] + for item in items: + artist_info = item.get("artist") or item.get("artists", [{}])[0] if item.get("artists") else {} + artist_name = artist_info.get("name", "Unknown Artist") if isinstance(artist_info, dict) else str(artist_info) + + videos_out.append({ + "id": item.get("id"), + "title": item.get("title"), + "artist": artist_name, + "duration": item.get("duration"), + "duration_formatted": self.format_duration(item.get("duration")), + "release_date": item.get("releaseDate"), + "image_id": item.get("imageId"), + "image_url": ( + f"https://resources.tidal.com/images/{item.get('imageId').replace('-', '/')}/640x360.jpg" + if item.get("imageId") + else None + ), + "quality": item.get("quality"), + }) + + return videos_out + + async def get_video_metadata(self, video_id: int) -> Optional[dict]: + """Get metadata for a specific video by ID. + + Args: + video_id: The Tidal video ID. + + Returns: + Video metadata dict or None if not found. + """ + video_id_str = str(video_id) + try: + metadata = await self._safe_api_call( + self.streamrip_client.get_metadata, + item_id=video_id_str, + media_type="video", + retries=3, + ) + except Exception as e: + logging.warning("get_video_metadata failed for %s: %s", video_id, e) + return None + + if not metadata: + return None + + artist_info = metadata.get("artist") or (metadata.get("artists", [{}])[0] if metadata.get("artists") else {}) + artist_name = artist_info.get("name", "Unknown Artist") if isinstance(artist_info, dict) else str(artist_info) + + return { + "id": metadata.get("id"), + "title": metadata.get("title"), + "artist": artist_name, + "artists": [a.get("name") for a in metadata.get("artists", [])], + "duration": metadata.get("duration"), + "duration_formatted": self.format_duration(metadata.get("duration")), + "release_date": metadata.get("releaseDate"), + "image_id": metadata.get("imageId"), + "image_url": ( + f"https://resources.tidal.com/images/{metadata.get('imageId').replace('-', '/')}/1280x720.jpg" + if metadata.get("imageId") + else None + ), + "thumbnail_url": ( + f"https://resources.tidal.com/images/{metadata.get('imageId').replace('-', '/')}/640x360.jpg" + if metadata.get("imageId") + else None + ), + "quality": metadata.get("quality"), + "explicit": metadata.get("explicit"), + "album": metadata.get("album", {}).get("title") if metadata.get("album") else None, + "album_id": metadata.get("album", {}).get("id") if metadata.get("album") else None, + } + + async def get_video_stream_url(self, video_id: int) -> Optional[str]: + """Get the HLS stream URL for a video. + + Args: + video_id: The Tidal video ID. + + Returns: + The highest quality video HLS variant URL (.m3u8) or None if not available. + """ + video_id_str = str(video_id) + logging.info("VIDEO %s: Fetching stream URL...", video_id) + try: + # First try the standard streamrip method + logging.info("VIDEO %s: Trying streamrip get_video_file_url...", video_id) + url = await self._safe_api_call( + self.streamrip_client.get_video_file_url, + video_id=video_id_str, + retries=2, + ) + if url: + logging.info("VIDEO %s: Got stream URL via streamrip", video_id) + return url if url else None + except Exception as e: + # Streamrip's get_video_file_url may fail if Tidal returns HLS manifest + # directly instead of a JSON with URLs. Try to get the manifest URL directly. + err_msg = str(e) + logging.info("VIDEO %s: streamrip method failed (%s), trying fallback...", video_id, err_msg[:100]) + if "mpegurl" in err_msg.lower() or ".m3u8" in err_msg: + # Extract the master manifest URL from the error message + import re + m3u8_match = re.search(r"(https://[^\s'\"]+\.m3u8[^\s'\"]*)", err_msg) + if m3u8_match: + master_url = m3u8_match.group(1) + logging.info("VIDEO %s: Extracted HLS master URL from error", video_id) + # Try to get the highest quality variant from the master playlist + best_url = await self._get_best_variant_from_master(master_url) + return best_url or master_url + + # Fall back to fetching the manifest URL directly from Tidal API + try: + logging.info("VIDEO %s: Trying direct API manifest fetch...", video_id) + result = await self._get_video_manifest_url(video_id_str) + if result: + logging.info("VIDEO %s: Got stream URL via direct API", video_id) + return result + except Exception as e2: + logging.warning("get_video_stream_url failed for %s: %s (fallback: %s)", video_id, e, e2) + return None + + async def _get_best_variant_from_master(self, master_url: str) -> Optional[str]: + """Parse HLS master playlist and return the highest quality variant URL.""" + import re + + try: + # Ensure we have a session + if not hasattr(self.streamrip_client, 'session') or not self.streamrip_client.session: + self.streamrip_client.session = await self.streamrip_client.get_session() + + async with self.streamrip_client.session.get(master_url) as resp: + if resp.status != 200: + return None + playlist_text = await resp.text() + + # Parse HLS master playlist for variant streams + stream_pattern = re.compile( + r'#EXT-X-STREAM-INF:.*?BANDWIDTH=(\d+).*?\n([^\n#]+)', + re.MULTILINE + ) + matches = stream_pattern.findall(playlist_text) + + if matches: + # Sort by bandwidth (highest quality = highest bandwidth) + matches.sort(key=lambda x: int(x[0]), reverse=True) + best_variant = matches[0][1].strip() + + # If it's a relative URL, make it absolute + if not best_variant.startswith('http'): + base_url = master_url.rsplit('/', 1)[0] + best_variant = f"{base_url}/{best_variant}" + + logging.info("Selected highest quality variant: bandwidth=%s", matches[0][0]) + return best_variant + except Exception as e: + logging.warning("Failed to parse HLS master playlist: %s", e) + + return None + + async def _get_video_manifest_url(self, video_id: str) -> Optional[str]: + """Directly fetch the HLS manifest URL from Tidal API. + + This is a fallback when streamrip's method fails due to format changes. + Returns the highest quality variant URL from the HLS master playlist. + """ + import base64 + import re + + params = { + "videoquality": "HIGH", + "playbackmode": "STREAM", + "assetpresentation": "FULL", + } + + # Ensure we have a session + if not hasattr(self.streamrip_client, 'session') or not self.streamrip_client.session: + self.streamrip_client.session = await self.streamrip_client.get_session() + + # Make the API request + resp = await self.streamrip_client._api_request( + f"videos/{video_id}/playbackinfopostpaywall", params=params + ) + + if not resp or "manifest" not in resp: + return None + + # Decode the manifest + manifest_data = json.loads(base64.b64decode(resp["manifest"]).decode("utf-8")) + + # The manifest should contain URLs - get the master playlist URL + urls = manifest_data.get("urls", []) + if not urls: + return None + + master_url = urls[0] + + # Try to fetch the master playlist and find the highest quality variant + try: + async with self.streamrip_client.session.get(master_url) as resp: + if resp.status == 200: + playlist_text = await resp.text() + + # Parse HLS master playlist for variant streams + # Look for lines like: #EXT-X-STREAM-INF:BANDWIDTH=...,RESOLUTION=1920x1080 + # followed by the variant URL + stream_pattern = re.compile( + r'#EXT-X-STREAM-INF:.*?BANDWIDTH=(\d+).*?\n([^\n#]+)', + re.MULTILINE + ) + matches = stream_pattern.findall(playlist_text) + + if matches: + # Sort by bandwidth (highest quality = highest bandwidth) + matches.sort(key=lambda x: int(x[0]), reverse=True) + best_variant = matches[0][1].strip() + + # If it's a relative URL, make it absolute + if not best_variant.startswith('http'): + base_url = master_url.rsplit('/', 1)[0] + best_variant = f"{base_url}/{best_variant}" + + logging.info("Selected highest quality video variant: bandwidth=%s", matches[0][0]) + return best_variant + except Exception as e: + logging.warning("Failed to parse HLS master playlist: %s", e) + + # Fall back to returning the master URL (ffmpeg will pick a variant) + return master_url + + async def download_video(self, video_id: int, output_path: Optional[str] = None) -> Optional[str]: + """Download a video by ID. + + Args: + video_id: The Tidal video ID. + output_path: Optional path to save the video. Can be a directory or full file path. + If not provided, a temp path is used. + + Returns: + The path to the downloaded video file, or None on failure. + """ + try: + logging.info("VIDEO %s: Getting stream URL...", video_id) + video_url = await self.get_video_stream_url(video_id) + if not video_url: + logging.warning("No video URL for video ID: %s", video_id) + return None + + logging.info("VIDEO %s: Got stream URL, preparing download...", video_id) + + # Determine output path + if not output_path: + unique = hashlib.sha256(uuid4().bytes).hexdigest()[:16] + dl_folder_path = f"{self.streamrip_config.session.downloads.folder}/{unique}" + try: + os.makedirs(dl_folder_path, exist_ok=True) + except Exception: + pass + output_path = f"{dl_folder_path}/{video_id}.mp4" + elif os.path.isdir(output_path): + # If output_path is a directory, append the video filename + output_path = os.path.join(output_path, f"{video_id}.mp4") + + # Video URLs are HLS manifests - use ffmpeg to download + logging.info("VIDEO %s: Starting ffmpeg HLS download to %s", video_id, output_path) + print(f"VIDEO {video_id}: Starting ffmpeg download...") + + cmd = [ + "ffmpeg", + "-nostdin", # Don't read from stdin - prevents SIGTTIN in background + "-hide_banner", + "-loglevel", "warning", + "-analyzeduration", "10M", + "-probesize", "10M", + "-i", video_url, + "-c:v", "copy", + "-c:a", "aac", + "-b:a", "256k", + "-af", "aresample=async=1:first_pts=0", + "-y", + output_path, + ] + + proc = await asyncio.create_subprocess_exec( + *cmd, + stdin=asyncio.subprocess.DEVNULL, + stdout=asyncio.subprocess.DEVNULL, + stderr=asyncio.subprocess.PIPE, + ) + + # Use communicate() to avoid buffer deadlocks + _, stderr = await proc.communicate() + + if proc.returncode != 0: + stderr_text = stderr.decode().strip() if stderr else "Unknown error" + logging.error("ffmpeg video download failed for %s: %s", video_id, stderr_text) + return None + + print(f"VIDEO {video_id}: ffmpeg completed, verifying file...") + + if not os.path.exists(output_path) or os.path.getsize(output_path) == 0: + logging.error("Video download completed but file missing or empty") + return None + + # Verify the MP4 is valid (has moov atom) + verify_cmd = [ + "ffprobe", + "-v", "error", + "-show_entries", "format=duration", + "-of", "default=noprint_wrappers=1:nokey=1", + output_path, + ] + verify_proc = await asyncio.create_subprocess_exec( + *verify_cmd, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE, + ) + verify_stdout, verify_stderr = await verify_proc.communicate() + + if verify_proc.returncode != 0: + stderr_text = verify_stderr.decode().strip() if verify_stderr else "" + logging.error("Downloaded video is corrupt (moov atom missing?): %s", stderr_text) + # Clean up corrupt file + try: + os.remove(output_path) + except Exception: + pass + return None + + duration = verify_stdout.decode().strip() if verify_stdout else "unknown" + logging.info("Video %s downloaded to %s (%d bytes, duration: %ss)", + video_id, output_path, os.path.getsize(output_path), duration) + return output_path + + except Exception as e: + logging.critical("Video download error for %s: %s", video_id, e) + return None + + return None # Should not reach here, but satisfy type checker + + async def get_videos_by_artist_id(self, artist_id: int, limit: int = 50) -> Optional[list[dict]]: + """Get videos by artist ID. + + Args: + artist_id: The Tidal artist ID. + limit: Maximum number of videos to return. + + Returns: + List of videos by the artist or None if not found. + """ + artist_id_str = str(artist_id) + + # Ensure we have a session + if not hasattr(self.streamrip_client, 'session') or not self.streamrip_client.session: + self.streamrip_client.session = await self.streamrip_client.get_session() + + try: + # Use the direct Tidal API endpoint for artist videos + resp = await self._safe_api_call( + self.streamrip_client._api_request, + f"artists/{artist_id_str}/videos", + params={"limit": limit, "offset": 0}, + retries=3, + ) + except Exception as e: + logging.warning("get_videos_by_artist_id API call failed: %s", e) + return None + + if not resp: + return None + + # The response has an "items" array + videos = resp.get("items", []) + if not videos: + return None + + videos_out = [] + for video in videos: + artist_info = video.get("artist") or (video.get("artists", [{}])[0] if video.get("artists") else {}) + artist_name = artist_info.get("name", "Unknown Artist") if isinstance(artist_info, dict) else str(artist_info) + + videos_out.append({ + "id": video.get("id"), + "title": video.get("title"), + "artist": artist_name, + "duration": video.get("duration"), + "duration_formatted": self.format_duration(video.get("duration")), + "release_date": video.get("releaseDate"), + "image_id": video.get("imageId"), + "image_url": ( + f"https://resources.tidal.com/images/{video.get('imageId').replace('-', '/')}/640x360.jpg" + if video.get("imageId") + else None + ), + }) + + return videos_out + + return videos_out + async def get_lrc_by_track_id(self, track_id: int) -> Optional[str]: """Get LRC lyrics by track ID.""" logging.debug(f"SR: Fetching metadata for track ID {track_id}")