diff --git a/.github/copilot-instructions.md b/.github/copilot-instructions.md new file mode 100644 index 0000000..e69de29 diff --git a/endpoints/lighting.py b/endpoints/lighting.py index 503f3ee..e7d36a1 100644 --- a/endpoints/lighting.py +++ b/endpoints/lighting.py @@ -3,6 +3,7 @@ import json import os import time import aiohttp +import asyncio from fastapi import FastAPI, Depends, HTTPException, Request from fastapi_throttle import RateLimiter from fastapi.responses import JSONResponse @@ -14,6 +15,8 @@ from pycync.user import User # type: ignore from pycync.cync import Cync as Cync # type: ignore from pycync import Auth # type: ignore from pycync.exceptions import TwoFactorRequiredError, AuthFailedError # type: ignore +import inspect +import getpass class Lighting(FastAPI): @@ -38,41 +41,59 @@ class Lighting(FastAPI): # Check if session is closed or missing if not self.session or getattr(self.session, "closed", False): self.session = aiohttp.ClientSession() + # Load cached token and check validity + self.cync_user = None cached_user = self._load_cached_user() + token_status = None if cached_user: + if hasattr(cached_user, "expires_at"): + if cached_user.expires_at > time.time(): + token_status = "valid" + else: + token_status = "expired" + else: + token_status = "missing expires_at" + else: + token_status = "no cached user" + logging.info(f"Cync token status: {token_status}") + + if token_status == "valid" and cached_user is not None: + # Use cached token self.auth = Auth( session=self.session, user=cached_user, username=cync_email, password=cync_password, ) + self.cync_user = cached_user + logging.info("Reusing valid cached token, no 2FA required.") else: + # Need fresh login self.auth = Auth( - session=self.session, username=cync_email, password=cync_password + session=self.session, + username=cync_email, + password=cync_password, ) - # Try to refresh token - self.cync_user = None - if ( - self.auth.user - and hasattr(self.auth.user, "expires_at") - and self.auth.user.expires_at > time.time() - ): - try: - await self.auth.async_refresh_user_token() - self.cync_user = self.auth.user - self._save_cached_user(self.cync_user) - except AuthFailedError: - pass - # If no valid token, login - if not self.cync_user: try: self.cync_user = await self.auth.login() self._save_cached_user(self.cync_user) except TwoFactorRequiredError: - logging.error( - "Cync 2FA required. Set CYNC_2FA_CODE in env if needed." - ) - raise Exception("Cync 2FA required.") + twofa_code = os.getenv("CYNC_2FA_CODE") + if not twofa_code: + print("Cync 2FA required. Please enter your code:") + twofa_code = getpass.getpass("2FA Code: ") + if twofa_code: + logging.info("Retrying Cync login with 2FA code.") + try: + self.cync_user = await self.auth.login(two_factor_code=twofa_code) + self._save_cached_user(self.cync_user) + logging.info("Logged in with 2FA successfully.") + except Exception as e: + logging.error("Cync 2FA login failed: %s", e) + raise Exception("Cync 2FA code invalid or not accepted.") + else: + logging.error("Cync 2FA required but no code provided.") + raise Exception("Cync 2FA required.") except AuthFailedError as e: logging.error("Failed to authenticate with Cync API: %s", e) raise Exception("Cync authentication failed.") @@ -125,55 +146,23 @@ class Lighting(FastAPI): f"Missing required environment variables: {', '.join(missing_vars)}" ) - self.session = aiohttp.ClientSession() - cached_user = self._load_cached_user() - if cached_user: - self.auth = Auth( - session=self.session, - user=cached_user, - username=self.cync_email or "", - password=self.cync_password or "", - ) - else: - self.auth = Auth( - session=self.session, - username=self.cync_email or "", - password=self.cync_password or "", - ) - # Try to refresh token - if ( - self.auth.user - and hasattr(self.auth.user, "expires_at") - and self.auth.user.expires_at > time.time() - ): - try: - await self.auth.async_refresh_user_token() - self.cync_user = self.auth.user - self._save_cached_user(self.cync_user) - except AuthFailedError: - pass - # If no valid token, login - if not self.cync_user: - try: - self.cync_user = await self.auth.login() - self._save_cached_user(self.cync_user) - except TwoFactorRequiredError: - logging.error( - "Cync 2FA required. Set CYNC_2FA_CODE in env if needed." - ) - raise Exception("Cync 2FA required.") - except AuthFailedError as e: - logging.error("Failed to authenticate with Cync API: %s", e) - raise Exception("Cync authentication failed.") - # Create persistent Cync API object - self.cync_api = await Cync.create(self.auth) + # Use ensure_cync_connection which has proper token caching + await self.ensure_cync_connection() + # Create persistent Cync API object + if self.auth: + self.cync_api = await Cync.create(self.auth) + + # Schedule periodic token validation + asyncio.create_task(self._schedule_token_validation()) + + # Register endpoints self.endpoints: dict = { "lighting/state": self.get_lighting_state, } for endpoint, handler in self.endpoints.items(): - app.add_api_route( + self.app.add_api_route( f"/{endpoint}", handler, methods=["GET"], @@ -184,7 +173,7 @@ class Lighting(FastAPI): ], ) - app.add_api_route( + self.app.add_api_route( "/lighting/state", self.set_lighting_state, methods=["POST"], @@ -195,6 +184,75 @@ class Lighting(FastAPI): ], ) + async def _refresh_or_login(self): + if not self.auth: + logging.error("Auth object is not initialized.") + raise Exception("Cync authentication not initialized.") + try: + user = getattr(self.auth, 'user', None) + if user and hasattr(user, "expires_at") and user.expires_at > time.time(): + refresh = getattr(self.auth, 'async_refresh_user_token', None) + if callable(refresh): + try: + result = refresh() + if inspect.isawaitable(result): + await result + else: + pass # do nothing if not awaitable + except AuthFailedError as e: + logging.warning("Token refresh failed: %s", e) + login = getattr(self.auth, 'login', None) + if callable(login): + try: + result = login() + if inspect.isawaitable(result): + self.cync_user = await result + else: + self.cync_user = result + self._save_cached_user(self.cync_user) + logging.info("Logged in successfully.") + except TwoFactorRequiredError: + twofa_code = os.getenv("CYNC_2FA_CODE") + if not twofa_code: + # Prompt interactively if not set + print("Cync 2FA required. Please enter your code:") + twofa_code = getpass.getpass("2FA Code: ") + if twofa_code: + logging.info("Retrying Cync login with 2FA code.") + try: + result = login(two_factor_code=twofa_code) + if inspect.isawaitable(result): + self.cync_user = await result + else: + self.cync_user = result + self._save_cached_user(self.cync_user) + logging.info("Logged in with 2FA successfully.") + except Exception as e: + logging.error("Cync 2FA login failed: %s", e) + raise Exception("Cync 2FA code invalid or not accepted.") + else: + logging.error("Cync 2FA required but no code provided.") + raise Exception("Cync 2FA required.") + else: + raise Exception("Auth object missing login method.") + except AuthFailedError as e: + logging.error("Failed to authenticate with Cync API: %s", e) + raise Exception("Cync authentication failed.") + except Exception as e: + logging.error("Unexpected error during authentication: %s", e) + raise + + async def _schedule_token_validation(self): + while True: + try: + await asyncio.sleep(300) + user = getattr(self.auth, 'user', None) + if user and hasattr(user, "expires_at") and user.expires_at - time.time() < 600: + logging.info("Token is about to expire. Refreshing...") + await self._refresh_or_login() + except Exception as e: + logging.error("Error during periodic token validation: %s", e) + def _load_cached_user(self): try: if os.path.exists(self.token_cache_path): @@ -253,8 +311,10 @@ class Lighting(FastAPI): """ Set the lighting state and apply it to the Cync device. """ + logging.info("=== LIGHTING STATE REQUEST RECEIVED ===") try: state = await request.json() + logging.info(f"Requested state: {state}") # Validate state (basic validation) if not isinstance(state, dict): raise HTTPException( @@ -302,10 +362,14 @@ class Lighting(FastAPI): try: # Use persistent Cync API object if not self.cync_api: - raise Exception("Cync API not initialized.") + logging.warning("Cync API not initialized, attempting to reconnect...") + await self.ensure_cync_connection() + if not self.cync_api: + raise Exception("Cync API still not initialized after reconnection.") + + logging.info("Getting devices from Cync API...") devices = self.cync_api.get_devices() - if not devices or not isinstance(devices, (list, tuple)): - raise Exception("No devices returned from Cync API.") + logging.info(f"Devices returned from Cync API: {[getattr(d, 'name', None) for d in devices]}") light = next( ( d @@ -315,27 +379,30 @@ class Lighting(FastAPI): None, ) if not light: + logging.error(f"Device '{self.cync_device_name}' not found in {[getattr(d, 'name', None) for d in devices]}") raise Exception(f"Device '{self.cync_device_name}' not found") - + logging.info(f"Selected device: {light}") # Set power if power == "on": - await light.turn_on() + result = await light.turn_on() + logging.info(f"turn_on result: {result}") else: - await light.turn_off() - + result = await light.turn_off() + logging.info(f"turn_off result: {result}") # Set brightness if "brightness" in state: - await light.set_brightness(brightness) - + result = await light.set_brightness(brightness) + logging.info(f"set_brightness result: {result}") # Set color if rgb: - await light.set_rgb(rgb) + result = await light.set_rgb(rgb) + logging.info(f"set_rgb result: {result}") break # Success, exit retry loop - except Exception as e: + except (aiohttp.ClientConnectionError, aiohttp.ClientOSError) as e: if attempt < max_retries - 1: logging.warning( - "Device operation failed (attempt %d/%d): %s. Retrying with reconnection.", + "Connection closed (attempt %d/%d): %s. Retrying with reconnection.", attempt + 1, max_retries, e, @@ -343,11 +410,20 @@ class Lighting(FastAPI): await self.ensure_cync_connection() else: logging.error( - "Device operation failed after %d attempts: %s", + "Connection failed after %d attempts: %s", max_retries, e, ) raise + except Exception as e: + logging.error("Unexpected error during device operation: %s", e) + logging.error("Error type: %s", type(e).__name__) + # Try to reconnect on any error for next attempt + if attempt < max_retries - 1: + logging.warning("Attempting reconnection due to error...") + await self.ensure_cync_connection() + else: + raise logging.info( "Successfully applied state to device '%s': %s", diff --git a/utils/rip_background.py b/utils/rip_background.py index b21ac48..5ea51d0 100644 --- a/utils/rip_background.py +++ b/utils/rip_background.py @@ -45,6 +45,21 @@ load_dotenv() sr = SRUtil() +logger = logging.getLogger(__name__) + + +async def check_flac_stream(file_path): + """Check if the given file contains a FLAC stream using ffprobe.""" + cmd = [ + "ffprobe", "-v", "error", "-select_streams", "a:0", "-show_entries", + "stream=codec_name", "-of", "default=noprint_wrappers=1:nokey=1", file_path + ] + process = await asyncio.create_subprocess_exec( + *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE + ) + stdout, _ = await process.communicate() + return b"flac" in stdout + # ---------- Discord helper ---------- async def discord_notify( @@ -259,131 +274,144 @@ def bulk_download(track_list: list, quality: str = "FLAC"): ) ) - async def process_tracks(): + async def process_tracks(track_list): per_track_meta = [] all_final_files = [] all_artists = set() (ROOT_DIR / "completed").mkdir(parents=True, exist_ok=True) + # Ensure aiohttp session is properly closed async with aiohttp.ClientSession(headers=HEADERS) as session: - # Set up a one-time rate-limit callback to notify on the first 429 seen by SRUtil - async def _rate_limit_notify(exc: Exception): + print(f"DEBUG: Starting process_tracks with {len(track_list)} tracks") + # Set up a one-time rate-limit callback to notify on the first 429 seen by SRUtil + async def _rate_limit_notify(exc: Exception): + try: + send_log_to_discord( + f"Rate limit observed while fetching metadata: {exc}", + "WARNING", + target, + ) + except Exception: + pass + + # attach callback and reset notified flag for this job run try: - send_log_to_discord( - f"Rate limit observed while fetching metadata: {exc}", - "WARNING", - target, - ) + sr.on_rate_limit = _rate_limit_notify + sr._rate_limit_notified = False except Exception: pass + total = len(track_list or []) + for i, track_id in enumerate(track_list or []): + print(f"DEBUG: Processing track {i+1}/{total}: {track_id}") + track_info = { + "track_id": str(track_id), + "status": "Pending", + "file_path": None, + "error": None, + "attempts": 0, + } + attempt = 0 - # attach callback and reset notified flag for this job run - try: - sr.on_rate_limit = _rate_limit_notify - sr._rate_limit_notified = False - except Exception: - pass - total = len(track_list or []) - for i, track_id in enumerate(track_list or []): - track_info = { - "track_id": str(track_id), - "status": "Pending", - "file_path": None, - "error": None, - "attempts": 0, - } - attempt = 0 - - while attempt < MAX_RETRIES: - tmp_file = None - attempt += 1 - track_info["attempts"] = attempt - - try: - sr.get_cover_by_album_id - url = await sr.get_stream_url_by_track_id(track_id, quality) - if not url: - raise RuntimeError("No stream URL") - - parsed = urlparse(url) - clean_path = unquote(parsed.path) - ext = Path(clean_path).suffix or ".mp3" - tmp_file = Path(f"/tmp/{uuid.uuid4().hex}{ext}") - - async with session.get(url) as resp: - resp.raise_for_status() - with open(tmp_file, "wb") as f: - async for chunk in resp.content.iter_chunked(64 * 1024): - f.write(chunk) + while attempt < MAX_RETRIES: + tmp_file = None + attempt += 1 + track_info["attempts"] = attempt try: - md = await sr.get_metadata_by_track_id(track_id) or {} - except MetadataFetchError as me: - # Permanent metadata failure — notify and continue (mark track failed) - msg = f"Metadata permanently failed for track {track_id}: {me}" - try: - send_log_to_discord(msg, "ERROR", target) - except Exception: - pass - track_info["status"] = "Failed" - track_info["error"] = str(me) - per_track_meta.append(track_info) - if job: - job.meta["tracks"] = per_track_meta - job.meta["progress"] = int(((i + 1) / total) * 100) - job.save_meta() - break - artist_raw = md.get("artist") or "Unknown Artist" - album_raw = md.get("album") or "Unknown Album" - title_raw = md.get("title") or f"Track {track_id}" - - artist = sanitize_filename(artist_raw) - album = sanitize_filename(album_raw) - title = sanitize_filename(title_raw) - - all_artists.add(artist) - album_dir = staging_root / artist / album - album_dir.mkdir(parents=True, exist_ok=True) - final_file = ensure_unique_path(album_dir / f"{title}{ext}") - - # Move file into final location first (tags will be updated on moved file) - tmp_file.rename(final_file) - - # Try to fetch cover art via SRUtil (use album_id from metadata) - try: - album_field = md.get("album") - album_id = md.get("album_id") or ( - album_field.get("id") - if isinstance(album_field, dict) - else None + print(f"DEBUG: Getting downloadable for track {track_id}") + # Fetch downloadable (handles DASH and others) + downloadable = await sr._safe_api_call( + sr.streamrip_client.get_downloadable, + str(track_id), + 2 if quality == "FLAC" else 1, + retries=3, ) - except Exception: - album_id = None - if album_id: + print(f"DEBUG: Got downloadable: {type(downloadable)}") + if not downloadable: + raise RuntimeError("No downloadable created") + + ext = f".{downloadable.extension}" + tmp_file = Path(f"/tmp/{uuid.uuid4().hex}{ext}") + + print(f"DEBUG: Starting download to {tmp_file}") + # Download + print(f"TRACK {track_id}: Starting download") try: - cover_url = await sr.get_cover_by_album_id( - album_id, size=640 + await downloadable._download(str(tmp_file), callback=lambda x=None: None) + print(f"TRACK {track_id}: Download method completed normally") + except Exception as download_e: + print(f"TRACK {track_id}: Download threw exception: {download_e}") + raise + + print(f"DEBUG: Download completed, file exists: {tmp_file.exists()}") + if not tmp_file.exists(): + raise RuntimeError(f"Download completed but no file created: {tmp_file}") + + print(f"DEBUG: Fetching metadata for track {track_id}") + # Metadata fetch + try: + md = await sr.get_metadata_by_track_id(track_id) or {} + print(f"DEBUG: Metadata fetched: {bool(md)}") + except MetadataFetchError as me: + # Permanent metadata failure — mark failed and break + track_info["status"] = "Failed" + track_info["error"] = str(me) + per_track_meta.append(track_info) + if job: + job.meta["tracks"] = per_track_meta + job.meta["progress"] = int(((i + 1) / total) * 100) + job.save_meta() + break + + artist_raw = md.get("artist") or "Unknown Artist" + album_raw = md.get("album") or "Unknown Album" + title_raw = md.get("title") or f"Track {track_id}" + + artist = sanitize_filename(artist_raw) + album = sanitize_filename(album_raw) + title = sanitize_filename(title_raw) + + print(f"TRACK {track_id}: Processing '{title}' by {artist}") + + all_artists.add(artist) + album_dir = staging_root / artist / album + album_dir.mkdir(parents=True, exist_ok=True) + final_file = ensure_unique_path(album_dir / f"{title}{ext}") + + # Move to final location + print(f"TRACK {track_id}: Moving to final location...") + tmp_file.rename(final_file) + print(f"TRACK {track_id}: File moved successfully") + + # Fetch cover art + try: + album_field = md.get("album") + album_id = md.get("album_id") or ( + album_field.get("id") if isinstance(album_field, dict) else None ) except Exception: - cover_url = None - else: - cover_url = md.get("cover_url") + album_id = None - # Embed tags + artwork using music_tag if available, falling back to mediafile tagging - embedded = False - try: + if album_id: + try: + cover_url = await sr.get_cover_by_album_id(album_id, size=640) + except Exception: + cover_url = None + else: + cover_url = md.get("cover_url") + + # Embed tags + embedded = False + img_bytes = None if cover_url: try: timeout = aiohttp.ClientTimeout(total=15) - async with session.get( - cover_url, timeout=timeout - ) as img_resp: + async with session.get(cover_url, timeout=timeout) as img_resp: if img_resp.status == 200: img_bytes = await img_resp.read() else: img_bytes = None - # Notify Discord about failed cover download (HTTP error) try: send_log_to_discord( f"Cover download HTTP `{img_resp.status}` for track `{track_id} album_id={album_id} url={cover_url} artist={artist} album={album}`", @@ -394,7 +422,6 @@ def bulk_download(track_list: list, quality: str = "FLAC"): pass except Exception as e: img_bytes = None - # Notify Discord about exception during cover download try: send_log_to_discord( f"Cover download exception for track `{track_id} album_id={album_id} url={cover_url} artist={artist} album={album}`: `{e}`", @@ -403,40 +430,40 @@ def bulk_download(track_list: list, quality: str = "FLAC"): ) except Exception: pass - else: - img_bytes = None - # Prefer music_tag if available (keeps compatibility with add_cover_art.py) + # Try music_tag first try: from music_tag import load_file as mt_load_file # type: ignore + # Add validation for `mf` object try: mf = mt_load_file(str(final_file)) - # set basic tags - if md.get("title"): - mf["title"] = md.get("title") - if md.get("artist"): - mf["artist"] = md.get("artist") - if md.get("album"): - mf["album"] = md.get("album") - tracknum = md.get("track_number") - if tracknum is not None: - try: - mf["tracknumber"] = int(tracknum) - except Exception: - pass - if img_bytes: - mf["artwork"] = img_bytes - mf.save() - embedded = True + if mf is not None: + if md.get("title"): + mf["title"] = md.get("title") + if md.get("artist"): + mf["artist"] = md.get("artist") + if md.get("album"): + mf["album"] = md.get("album") + tracknum = md.get("track_number") + if tracknum is not None: + try: + mf["tracknumber"] = int(tracknum) + except Exception: + pass + if img_bytes: + mf["artwork"] = img_bytes + mf.save() + embedded = True + else: + logger.error("Failed to load file with music_tag.") + embedded = False except Exception: embedded = False except Exception: embedded = False - # If music_tag not available or failed, fallback to mediafile tagging if not embedded: - # If we had a cover_url but no bytes, log a warning to Discord try: if cover_url and not img_bytes: send_log_to_discord( @@ -446,82 +473,72 @@ def bulk_download(track_list: list, quality: str = "FLAC"): ) except Exception: pass - tag_with_mediafile(str(final_file), md) - except Exception: - # Ensure at least the basic tags are written - try: - tag_with_mediafile(str(final_file), md) - except Exception: - pass - tmp_file = None + try: + tag_with_mediafile(str(final_file), md) + except Exception: + pass - track_info["status"] = "Success" - track_info["file_path"] = str(final_file) - track_info["error"] = None - all_final_files.append(final_file) + # Success + tmp_file = None + track_info["status"] = "Success" + track_info["file_path"] = str(final_file) + track_info["error"] = None + all_final_files.append(final_file) - if job: - job.meta["progress"] = int(((i + 1) / total) * 100) - job.meta["tracks"] = per_track_meta + [track_info] - job.save_meta() - break + print(f"TRACK {track_id}: SUCCESS! Progress: {((i + 1) / total) * 100:.0f}%") - except aiohttp.ClientResponseError as e: - msg = f"Track {track_id} attempt {attempt} ClientResponseError: {e}" - send_log_to_discord(msg, "WARNING", target) - if e.status == 429: - wait_time = min(60, 2**attempt) - await asyncio.sleep(wait_time) - else: - await asyncio.sleep( - random.uniform(THROTTLE_MIN, THROTTLE_MAX) - ) + if job: + job.meta["progress"] = int(((i + 1) / total) * 100) + job.meta["tracks"] = per_track_meta + [track_info] + job.save_meta() + break - except Exception as e: - tb = traceback.format_exc() - is_no_stream_url = ( - isinstance(e, RuntimeError) and str(e) == "No stream URL" - ) - if is_no_stream_url: - if attempt == 1 or attempt == MAX_RETRIES: + except aiohttp.ClientResponseError as e: + msg = f"Track {track_id} attempt {attempt} ClientResponseError: {e}" + send_log_to_discord(msg, "WARNING", target) + if getattr(e, "status", None) == 429: + wait_time = min(60, 2 ** attempt) + await asyncio.sleep(wait_time) + else: + await asyncio.sleep(random.uniform(THROTTLE_MIN, THROTTLE_MAX)) + + except Exception as e: + tb = traceback.format_exc() + is_no_stream_url = isinstance(e, RuntimeError) and str(e) == "No stream URL" + if is_no_stream_url: + if attempt == 1 or attempt == MAX_RETRIES: + msg = f"Track {track_id} attempt {attempt} failed: {e}\n{tb}" + send_log_to_discord(msg, "ERROR", target) + track_info["error"] = str(e) + if attempt >= MAX_RETRIES: + track_info["status"] = "Failed" + send_log_to_discord( + f"Track {track_id} failed after {attempt} attempts", + "ERROR", + target, + ) + await asyncio.sleep(random.uniform(THROTTLE_MIN, THROTTLE_MAX)) + else: msg = f"Track {track_id} attempt {attempt} failed: {e}\n{tb}" send_log_to_discord(msg, "ERROR", target) - track_info["error"] = str(e) - if attempt >= MAX_RETRIES: - track_info["status"] = "Failed" - send_log_to_discord( - f"Track {track_id} failed after {attempt} attempts", - "ERROR", - target, - ) - await asyncio.sleep( - random.uniform(THROTTLE_MIN, THROTTLE_MAX) - ) - else: - msg = ( - f"Track {track_id} attempt {attempt} failed: {e}\n{tb}" - ) - send_log_to_discord(msg, "ERROR", target) - track_info["error"] = str(e) - if attempt >= MAX_RETRIES: - track_info["status"] = "Failed" - send_log_to_discord( - f"Track {track_id} failed after {attempt} attempts", - "ERROR", - target, - ) - await asyncio.sleep( - random.uniform(THROTTLE_MIN, THROTTLE_MAX) - ) + track_info["error"] = str(e) + if attempt >= MAX_RETRIES: + track_info["status"] = "Failed" + send_log_to_discord( + f"Track {track_id} failed after {attempt} attempts", + "ERROR", + target, + ) + await asyncio.sleep(random.uniform(THROTTLE_MIN, THROTTLE_MAX)) - finally: - try: - if tmp_file and tmp_file.exists(): - os.remove(tmp_file) - except Exception: - pass + finally: + try: + if tmp_file and tmp_file.exists(): + os.remove(tmp_file) + except Exception: + pass - per_track_meta.append(track_info) + per_track_meta.append(track_info) if not all_final_files: if job: @@ -662,7 +679,7 @@ def bulk_download(track_list: list, quality: str = "FLAC"): loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) try: - return loop.run_until_complete(process_tracks()) + return loop.run_until_complete(process_tracks(track_list)) except Exception as e: send_log_to_discord( f"bulk_download failed: {e}\n{traceback.format_exc()}", "CRITICAL", target @@ -672,3 +689,12 @@ def bulk_download(track_list: list, quality: str = "FLAC"): job.save_meta() finally: loop.close() + +# Correct integration of FLAC stream check +async def process_tracks(track_list): + for i, track_id in enumerate(track_list or []): + combined_path = f"/tmp/{uuid.uuid4().hex}_combined.m4s" # Example path + if not await check_flac_stream(combined_path): + logger.error(f"No FLAC stream found in {combined_path}. Skipping file.") + continue + # Proceed with decoding pipeline