diff --git a/base.py b/base.py index d29b4d4..2d33c3c 100644 --- a/base.py +++ b/base.py @@ -108,6 +108,7 @@ routes: dict = { "lyrics": importlib.import_module("endpoints.lyric_search").LyricSearch( app, util, constants ), + "lrclib": importlib.import_module("endpoints.lrclib").LRCLib(app, util, constants), "yt": importlib.import_module("endpoints.yt").YT(app, util, constants), "radio": importlib.import_module("endpoints.radio").Radio( app, util, constants, loop diff --git a/endpoints/constructors.py b/endpoints/constructors.py index ac16050..491bb72 100644 --- a/endpoints/constructors.py +++ b/endpoints/constructors.py @@ -110,6 +110,25 @@ class ValidLyricRequest(BaseModel): } +class ValidLRCLibRequest(BaseModel): + """ + Request model for lyric search. + + Attributes: + - **artist** (str): Artist. + - **song** (str): Song. + - **duration** (Optional[int]): Optional duration. + """ + + artist: Optional[str] = None + song: Optional[str] = None + duration: Optional[int] = None + + model_config = { + "json_schema_extra": {"examples": [{"artist": "eminem", "song": "rap god"}]} + } + + class ValidTypeAheadRequest(BaseModel): """ Request model for typeahead query. diff --git a/endpoints/lighting.py b/endpoints/lighting.py index e7d36a1..9a36e94 100644 --- a/endpoints/lighting.py +++ b/endpoints/lighting.py @@ -4,6 +4,8 @@ import os import time import aiohttp import asyncio +import traceback +from datetime import datetime from fastapi import FastAPI, Depends, HTTPException, Request from fastapi_throttle import RateLimiter from fastapi.responses import JSONResponse @@ -17,10 +19,73 @@ from pycync import Auth # type: ignore from pycync.exceptions import TwoFactorRequiredError, AuthFailedError # type: ignore import inspect import getpass +from typing import Optional + +# Configure logging to write to a file for specific events +logging.basicConfig( + filename="cync_auth_events.log", + level=logging.INFO, + format="%(asctime)s - %(levelname)s - %(message)s", +) + + +def _mask_token(token: Optional[str]) -> str: + """Mask sensitive token data for logging, showing only first/last 4 chars.""" + if not token or len(token) < 8: + return "" + return f"{token[:4]}...{token[-4:]}" + + +def _log_token_state(user, context: str): + """Log masked token state for debugging.""" + if not user: + logging.info(f"{context} - No user object available") + return + + try: + logging.info( + f"{context} - Token state: access=%s refresh=%s expires_at=%s", + _mask_token(getattr(user, "access_token", None)), + _mask_token(getattr(user, "refresh_token", None)), + getattr(user, "expires_at", None), + ) + except Exception as e: + logging.error(f"Error logging token state: {e}") class Lighting(FastAPI): - async def ensure_cync_connection(self): + async def _close_session_safely(self): + """Safely close the current session if it exists.""" + if self.session and not getattr(self.session, "closed", True): + try: + await self.session.close() + # Wait a bit for the underlying connections to close + await asyncio.sleep(0.1) + except Exception as e: + logging.warning(f"Error closing session: {e}") + self.session = None + self.auth = None + self.cync_api = None + + async def _test_connection_health(self) -> bool: + """Test if the current connection is healthy by making a simple API call.""" + if ( + not self.cync_api + or not self.session + or getattr(self.session, "closed", True) + ): + return False + + try: + # Make a simple API call to test connectivity + devices = self.cync_api.get_devices() + # Just check if we get a response without errors + return devices is not None + except Exception as e: + logging.warning(f"Connection health check failed: {e}") + return False + + async def ensure_cync_connection(self, force_reconnect: bool = False): """Ensure aiohttp session and Cync API are alive, re-create if needed.""" # Check required environment variables missing_vars = [] @@ -34,29 +99,49 @@ class Lighting(FastAPI): raise Exception( f"Missing required environment variables: {', '.join(missing_vars)}" ) + # Cast to str after check to silence linter cync_email: str = self.cync_email # type: ignore cync_password: str = self.cync_password # type: ignore - # Check if session is closed or missing - if not self.session or getattr(self.session, "closed", False): - self.session = aiohttp.ClientSession() + # If force_reconnect is True or connection is unhealthy, rebuild everything + if force_reconnect or not await self._test_connection_health(): + logging.info( + "Connection unhealthy or force reconnect requested. Rebuilding connection..." + ) + + # Clean up existing connection + await self._close_session_safely() + + # Create new session with timeout configuration + timeout = aiohttp.ClientTimeout(total=30, connect=10) + connector = aiohttp.TCPConnector( + limit=100, + limit_per_host=30, + ttl_dns_cache=300, + use_dns_cache=True, + keepalive_timeout=60, + enable_cleanup_closed=True, + ) + self.session = aiohttp.ClientSession(timeout=timeout, connector=connector) + # Load cached token and check validity self.cync_user = None cached_user = self._load_cached_user() token_status = None - if cached_user: - if hasattr(cached_user, "expires_at"): - if cached_user.expires_at > time.time(): - token_status = "valid" - else: - token_status = "expired" + + if cached_user and hasattr(cached_user, "expires_at"): + # Add buffer time - consider token expired if less than 5 minutes remaining + buffer_time = 300 # 5 minutes + if cached_user.expires_at > (time.time() + buffer_time): + token_status = "valid" else: - token_status = "missing expires_at" + token_status = "expired" else: - token_status = "no cached user" + token_status = "no cached user or missing expires_at" + logging.info(f"Cync token status: {token_status}") - + if token_status == "valid" and cached_user is not None: # Use cached token self.auth = Auth( @@ -68,15 +153,26 @@ class Lighting(FastAPI): self.cync_user = cached_user logging.info("Reusing valid cached token, no 2FA required.") else: - # Need fresh login + # Need fresh login - clear any cached user that's expired + if token_status == "expired": + try: + os.remove(self.token_cache_path) + logging.info("Removed expired token cache") + except (OSError, FileNotFoundError): + pass + + logging.info("Initializing new Auth instance...") self.auth = Auth( session=self.session, username=cync_email, password=cync_password, ) try: + logging.info("Attempting fresh login...") self.cync_user = await self.auth.login() + _log_token_state(self.cync_user, "After fresh login") self._save_cached_user(self.cync_user) + logging.info("Fresh login successful") except TwoFactorRequiredError: twofa_code = os.getenv("CYNC_2FA_CODE") if not twofa_code: @@ -85,11 +181,18 @@ class Lighting(FastAPI): if twofa_code: logging.info("Retrying Cync login with 2FA code.") try: - self.cync_user = await self.auth.login(two_factor_code=twofa_code) + self.cync_user = await self.auth.login( + two_factor_code=twofa_code + ) self._save_cached_user(self.cync_user) logging.info("Logged in with 2FA successfully.") except Exception as e: logging.error("Cync 2FA login failed: %s", e) + logging.info( + "2FA failure details: Code=%s, User=%s", + twofa_code, + self.cync_user, + ) raise Exception("Cync 2FA code invalid or not accepted.") else: logging.error("Cync 2FA required but no code provided.") @@ -97,13 +200,149 @@ class Lighting(FastAPI): except AuthFailedError as e: logging.error("Failed to authenticate with Cync API: %s", e) raise Exception("Cync authentication failed.") - self.cync_api = await Cync.create(self.auth) - # Also check if cync_api is None (shouldn't happen, but just in case) - if not self.cync_api: - if not self.auth: - logging.critical("self.auth: %s", self.auth) - return - self.cync_api = await Cync.create(self.auth) + + # Create new Cync API instance + try: + logging.info("Creating Cync API instance...") + _log_token_state(self.auth.user, "Before Cync.create") + + # Check if token needs refresh before API creation + now = time.time() + expires_at = getattr(self.auth.user, "expires_at", 0) + time_until_expiry = expires_at - now + logging.info( + f"Token expires in {int(time_until_expiry / 60)} minutes (at {datetime.fromtimestamp(expires_at).isoformat()})" + ) + + # Always try refresh if we're reusing a cached token + if token_status == "valid": + logging.info("Testing cached token with refresh attempt") + try: + refresh = getattr(self.auth, "async_refresh_user_token", None) + if callable(refresh): + # Log session state before refresh + if hasattr(self.session, "cookie_jar"): + try: + cookie_count = len(self.session.cookie_jar) + logging.info( + f"Session has {cookie_count} cookies before refresh" + ) + except Exception as cookie_e: + logging.warning( + f"Could not check cookies: {cookie_e}" + ) + + result = refresh() + if inspect.isawaitable(result): + try: + await result + logging.info("Token refresh test succeeded") + except AuthFailedError: + logging.warning( + "Cached token rejected by server despite being valid locally" + ) + # Clear cached token and force fresh login + try: + os.remove(self.token_cache_path) + logging.info("Cleared rejected token cache") + except (OSError, FileNotFoundError): + pass + + logging.info( + "Attempting fresh login after refresh rejection..." + ) + self.auth = Auth( + session=self.session, + username=cync_email, + password=cync_password, + ) + self.cync_user = await self.auth.login() + self._save_cached_user(self.cync_user) + logging.info( + "Fresh login successful after refresh rejection" + ) + else: + logging.warning( + "Refresh method returned non-awaitable result" + ) + except Exception as refresh_e: + logging.error(f"Pre-API refresh failed: {refresh_e}") + logging.error( + "Refresh error traceback:\n%s", traceback.format_exc() + ) + + self.cync_api = await Cync.create(self.auth) + logging.info("Cync API connection established successfully") + except Exception as e: + logging.error("Failed to create Cync API instance") + logging.error("Exception details: %s", str(e)) + logging.error("Traceback:\n%s", traceback.format_exc()) + + # Save diagnostic info + diagnostic_data = { + "timestamp": datetime.now().isoformat(), + "error_type": type(e).__name__, + "error_message": str(e), + "auth_state": { + "has_auth": bool(self.auth), + "has_user": bool(getattr(self.auth, "user", None)), + "user_state": { + "access_token": _mask_token( + getattr(self.auth.user, "access_token", None) + ) + if self.auth and self.auth.user + else None, + "refresh_token": _mask_token( + getattr(self.auth.user, "refresh_token", None) + ) + if self.auth and self.auth.user + else None, + "expires_at": getattr(self.auth.user, "expires_at", None) + if self.auth and self.auth.user + else None, + "time_until_expiry_minutes": int( + (getattr(self.auth.user, "expires_at", 0) - time.time()) + / 60 + ) + if self.auth and self.auth.user + else None, + "refresh_method_exists": hasattr( + self.auth, "async_refresh_user_token" + ) + if self.auth + else False, + "refresh_method_callable": callable( + getattr(self.auth, "async_refresh_user_token", None) + ) + if self.auth + else False, + } + if self.auth and self.auth.user + else None, + }, + } + diagnostic_file = f"cync_api_failure-{int(time.time())}.json" + try: + with open(diagnostic_file, "w") as f: + json.dump(diagnostic_data, f, indent=2) + logging.info( + f"Saved API creation diagnostic data to {diagnostic_file}" + ) + except Exception as save_error: + logging.error(f"Failed to save diagnostic data: {save_error}") + raise + + # Final validation + if ( + not self.cync_api + or not self.session + or getattr(self.session, "closed", True) + ): + logging.error("Connection validation failed after setup") + _log_token_state( + getattr(self.auth, "user", None), "Failed connection validation" + ) + raise Exception("Failed to establish proper Cync connection") """ Lighting Endpoints @@ -129,6 +368,7 @@ class Lighting(FastAPI): self.auth = None self.cync_user = None self.cync_api = None + self.health_check_task: Optional[asyncio.Task] = None # Set up Cync connection at startup using FastAPI event @app.on_event("startup") @@ -147,14 +387,30 @@ class Lighting(FastAPI): ) # Use ensure_cync_connection which has proper token caching - await self.ensure_cync_connection() + try: + await self.ensure_cync_connection() + logging.info("Cync lighting system initialized successfully") + except Exception as e: + logging.error(f"Failed to initialize Cync connection at startup: {e}") + # Don't raise - allow server to start, connection will be retried on first request - # Create persistent Cync API object - if self.auth: - self.cync_api = await Cync.create(self.auth) + # Schedule periodic token validation and connection health checks + self.health_check_task = asyncio.create_task(self._schedule_health_checks()) - # Schedule periodic token validation - asyncio.create_task(self._schedule_token_validation()) + @app.on_event("shutdown") + async def shutdown_event(): + # Cancel health check task + if self.health_check_task and not self.health_check_task.done(): + self.health_check_task.cancel() + try: + await self.health_check_task + except asyncio.CancelledError: + logging.info("Health check task cancelled successfully") + pass + + # Clean up connections + await self._close_session_safely() + logging.info("Cync lighting system shut down cleanly") # Register endpoints self.endpoints: dict = { @@ -189,19 +445,55 @@ class Lighting(FastAPI): logging.error("Auth object is not initialized.") raise Exception("Cync authentication not initialized.") try: - user = getattr(self.auth, 'user', None) + user = getattr(self.auth, "user", None) + _log_token_state(user, "Before refresh attempt") + if user and hasattr(user, "expires_at") and user.expires_at > time.time(): - refresh = getattr(self.auth, 'async_refresh_user_token', None) + refresh = getattr(self.auth, "async_refresh_user_token", None) if callable(refresh): try: + logging.info("Attempting token refresh...") result = refresh() if inspect.isawaitable(result): await result + logging.info( + "Token refresh completed successfully (awaited)" + ) else: - pass # do nothing if not awaitable + logging.info("Token refresh completed (non-awaitable)") except AuthFailedError as e: - logging.warning("Token refresh failed: %s", e) - login = getattr(self.auth, 'login', None) + logging.error("Token refresh failed with AuthFailedError") + logging.error("Exception details: %s", str(e)) + logging.error("Traceback:\n%s", traceback.format_exc()) + + # Save diagnostic info to file + diagnostic_data = { + "timestamp": datetime.now().isoformat(), + "error_type": "AuthFailedError", + "error_message": str(e), + "user_state": { + "access_token": _mask_token( + getattr(user, "access_token", None) + ), + "refresh_token": _mask_token( + getattr(user, "refresh_token", None) + ), + "expires_at": getattr(user, "expires_at", None), + }, + } + try: + diagnostic_file = ( + f"cync_auth_failure-{int(time.time())}.json" + ) + with open(diagnostic_file, "w") as f: + json.dump(diagnostic_data, f, indent=2) + logging.info(f"Saved diagnostic data to {diagnostic_file}") + except Exception as save_error: + logging.error( + f"Failed to save diagnostic data: {save_error}" + ) + raise + login = getattr(self.auth, "login", None) if callable(login): try: result = login() @@ -229,6 +521,11 @@ class Lighting(FastAPI): logging.info("Logged in with 2FA successfully.") except Exception as e: logging.error("Cync 2FA login failed: %s", e) + logging.info( + "2FA failure details: Code=%s, User=%s", + twofa_code, + self.cync_user, + ) raise Exception("Cync 2FA code invalid or not accepted.") else: logging.error("Cync 2FA required but no code provided.") @@ -242,16 +539,39 @@ class Lighting(FastAPI): logging.error("Unexpected error during authentication: %s", e) raise - async def _schedule_token_validation(self): + async def _schedule_health_checks(self): + """Periodic health checks and token validation.""" while True: try: - await asyncio.sleep(300) - user = getattr(self.auth, 'user', None) - if user and hasattr(user, "expires_at") and user.expires_at - time.time() < 600: - logging.info("Token is about to expire. Refreshing...") - await self._refresh_or_login() + await asyncio.sleep(300) # Check every 5 minutes + + # Check token expiration (refresh if less than 10 minutes left) + if self.cync_user and hasattr(self.cync_user, "expires_at"): + expires_at = getattr(self.cync_user, "expires_at", 0) + time_until_expiry = expires_at - time.time() + if time_until_expiry < 600: # Less than 10 minutes + logging.info( + f"Token expires in {int(time_until_expiry / 60)} minutes. Refreshing..." + ) + try: + await self._refresh_or_login() + except Exception as e: + logging.error( + f"Token refresh failed during health check: {e}" + ) + + # Test connection health + if not await self._test_connection_health(): + logging.warning( + "Connection health check failed. Will reconnect on next API call." + ) + + except asyncio.CancelledError: + logging.info("Health check task cancelled") + break except Exception as e: - logging.error("Error during periodic token validation: %s", e) + logging.error(f"Error during periodic health check: {e}") + # Continue the loop even on errors def _load_cached_user(self): try: @@ -356,20 +676,31 @@ class Lighting(FastAPI): else: rgb = None - # Apply to Cync device with retry on connection issues - max_retries = 2 + # Apply to Cync device with robust retry and error handling + max_retries = 3 + last_exception: Exception = Exception("No attempts made") + for attempt in range(max_retries): try: - # Use persistent Cync API object + # Ensure connection before each attempt + force_reconnect = attempt > 0 # Force reconnect on retries + await self.ensure_cync_connection(force_reconnect=force_reconnect) + if not self.cync_api: - logging.warning("Cync API not initialized, attempting to reconnect...") - await self.ensure_cync_connection() - if not self.cync_api: - raise Exception("Cync API still not initialized after reconnection.") - - logging.info("Getting devices from Cync API...") + raise Exception("Cync API not available after connection setup") + + logging.info( + f"Attempt {attempt + 1}/{max_retries}: Getting devices from Cync API..." + ) devices = self.cync_api.get_devices() - logging.info(f"Devices returned from Cync API: {[getattr(d, 'name', None) for d in devices]}") + + if not devices: + raise Exception("No devices returned from Cync API") + + logging.info( + f"Devices returned: {[getattr(d, 'name', 'unnamed') for d in devices]}" + ) + light = next( ( d @@ -378,52 +709,101 @@ class Lighting(FastAPI): ), None, ) + if not light: - logging.error(f"Device '{self.cync_device_name}' not found in {[getattr(d, 'name', None) for d in devices]}") - raise Exception(f"Device '{self.cync_device_name}' not found") - logging.info(f"Selected device: {light}") + available_devices = [ + getattr(d, "name", "unnamed") for d in devices + ] + raise Exception( + f"Device '{self.cync_device_name}' not found. Available devices: {available_devices}" + ) + + logging.info( + f"Selected device: {getattr(light, 'name', 'unnamed')}" + ) + + # Execute device operations + operations_completed = [] + # Set power if power == "on": result = await light.turn_on() - logging.info(f"turn_on result: {result}") + operations_completed.append(f"turn_on: {result}") else: result = await light.turn_off() - logging.info(f"turn_off result: {result}") + operations_completed.append(f"turn_off: {result}") + # Set brightness if "brightness" in state: result = await light.set_brightness(brightness) - logging.info(f"set_brightness result: {result}") + operations_completed.append( + f"set_brightness({brightness}): {result}" + ) + # Set color if rgb: result = await light.set_rgb(rgb) - logging.info(f"set_rgb result: {result}") + operations_completed.append(f"set_rgb({rgb}): {result}") + logging.info( + f"All operations completed successfully: {operations_completed}" + ) break # Success, exit retry loop - except (aiohttp.ClientConnectionError, aiohttp.ClientOSError) as e: + + except ( + aiohttp.ClientConnectionError, + aiohttp.ClientOSError, + aiohttp.ServerDisconnectedError, + aiohttp.ClientConnectorError, + ConnectionResetError, + ConnectionError, + OSError, + asyncio.TimeoutError, + ) as e: + last_exception = e + logging.warning( + f"Connection/network error (attempt {attempt + 1}/{max_retries}): {type(e).__name__}: {e}" + ) + if attempt < max_retries - 1: + # Wait a bit before retry to allow network/server recovery + await asyncio.sleep( + 2**attempt + ) # Exponential backoff: 1s, 2s, 4s + continue + + except (AuthFailedError, TwoFactorRequiredError) as e: + last_exception = e + logging.error( + f"Authentication error (attempt {attempt + 1}/{max_retries}): {e}" + ) + if attempt < max_retries - 1: + # Clear cached tokens on auth errors + try: + os.remove(self.token_cache_path) + logging.info("Cleared token cache due to auth error") + except (OSError, FileNotFoundError): + pass + await asyncio.sleep(1) + continue + + except Exception as e: + last_exception = e + error_msg = f"Unexpected error (attempt {attempt + 1}/{max_retries}): {type(e).__name__}: {e}" + logging.error(error_msg) + + # On unexpected errors, try reconnecting for next attempt if attempt < max_retries - 1: logging.warning( - "Connection closed (attempt %d/%d): %s. Retrying with reconnection.", - attempt + 1, - max_retries, - e, + "Forcing full reconnection due to unexpected error..." ) - await self.ensure_cync_connection() - else: - logging.error( - "Connection failed after %d attempts: %s", - max_retries, - e, - ) - raise - except Exception as e: - logging.error("Unexpected error during device operation: %s", e) - logging.error("Error type: %s", type(e).__name__) - # Try to reconnect on any error for next attempt - if attempt < max_retries - 1: - logging.warning("Attempting reconnection due to error...") - await self.ensure_cync_connection() - else: - raise + await asyncio.sleep(1) + continue + + # If we get here, all retries failed + logging.error( + f"All {max_retries} attempts failed. Last error: {type(last_exception).__name__}: {last_exception}" + ) + raise last_exception logging.info( "Successfully applied state to device '%s': %s", diff --git a/endpoints/lrclib.py b/endpoints/lrclib.py new file mode 100644 index 0000000..7397e68 --- /dev/null +++ b/endpoints/lrclib.py @@ -0,0 +1,199 @@ +import urllib.parse +from fastapi import FastAPI, HTTPException, Depends +from fastapi_throttle import RateLimiter +from fastapi.responses import JSONResponse +from typing import Type, Optional +from sqlalchemy import ( + and_, + true, + Column, + Integer, + String, + Float, + Boolean, + DateTime, + ForeignKey, + UniqueConstraint, + create_engine, +) +from sqlalchemy.orm import Session, relationship +from sqlalchemy.ext.declarative import declarative_base, DeclarativeMeta +from sqlalchemy.orm import sessionmaker +from .constructors import ValidLRCLibRequest +from lyric_search.constructors import LRCLibResult +from lyric_search import notifier +from sqlalchemy.orm import foreign + +Base: Type[DeclarativeMeta] = declarative_base() + + +class Tracks(Base): # type: ignore + __tablename__ = "tracks" + + id = Column(Integer, primary_key=True, autoincrement=True) + name = Column(String) + name_lower = Column(String, index=True) + artist_name = Column(String) + artist_name_lower = Column(String, index=True) + album_name = Column(String) + album_name_lower = Column(String, index=True) + duration = Column(Float, index=True) + last_lyrics_id = Column(Integer, ForeignKey("lyrics.id"), index=True) + created_at = Column(DateTime) + updated_at = Column(DateTime) + + # Relationships + lyrics = relationship( + "Lyrics", + back_populates="track", + foreign_keys=[last_lyrics_id], + primaryjoin="Tracks.id == foreign(Lyrics.track_id)", # Use string reference for Lyrics + ) + + # Constraints + __table_args__ = ( + UniqueConstraint( + "name_lower", + "artist_name_lower", + "album_name_lower", + "duration", + name="uq_tracks", + ), + ) + + +class Lyrics(Base): # type: ignore + __tablename__ = "lyrics" + + id = Column(Integer, primary_key=True, autoincrement=True) + plain_lyrics = Column(String) + synced_lyrics = Column(String) + track_id = Column(Integer, ForeignKey("tracks.id"), index=True) + has_plain_lyrics = Column(Boolean, index=True) + has_synced_lyrics = Column(Boolean, index=True) + instrumental = Column(Boolean) + source = Column(String, index=True) + created_at = Column(DateTime, index=True) + updated_at = Column(DateTime) + + # Relationships + track = relationship( + "Tracks", + back_populates="lyrics", + foreign_keys=[track_id], + primaryjoin=(Tracks.id == foreign(track_id)), + remote_side=Tracks.id, + ) + + +DATABASE_URL: str = "sqlite:////nvme/sqlite_dbs/lrclib.db" +engine = create_engine(DATABASE_URL, connect_args={"check_same_thread": False}) +SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine) + + +def get_db(): + db = SessionLocal() + try: + yield db + finally: + db.close() + + +""" +TODO: + - Move retrieval to lyric_search.sources, with separate file for DB Model +""" + + +class LRCLib(FastAPI): + """ + LRCLib Cache Search Endpoint + """ + + def __init__(self, app: FastAPI, util, constants) -> None: + """Initialize LyricSearch endpoints.""" + self.app: FastAPI = app + self.util = util + self.constants = constants + self.declarative_base = declarative_base() + self.notifier = notifier.DiscordNotifier() + + self.endpoints: dict = { + "lrclib/search": self.lyric_search_handler, + } + + for endpoint, handler in self.endpoints.items(): + times: int = 20 + seconds: int = 2 + rate_limit: tuple[int, int] = (2, 3) # Default; (Times, Seconds) + (times, seconds) = rate_limit + + app.add_api_route( + f"/{endpoint}", + handler, + methods=["POST"], + include_in_schema=True, + dependencies=[Depends(RateLimiter(times=times, seconds=seconds))], + ) + + async def lyric_search_handler( + self, data: ValidLRCLibRequest, db: Session = Depends(get_db) + ) -> JSONResponse: + """ + Search for lyrics. + + Parameters: + - **data** (ValidLRCLibRequest): Request containing artist, song, and other parameters. + + Returns: + - **JSONResponse**: LRCLib data or error. + """ + if not data.artist or not data.song: + raise HTTPException(detail="Invalid request", status_code=500) + + search_artist: str = urllib.parse.unquote(data.artist).lower() + search_song: str = urllib.parse.unquote(data.song).lower() + search_duration: Optional[int] = data.duration + + if not isinstance(search_artist, str) or not isinstance(search_song, str): + return JSONResponse( + status_code=500, + content={ + "err": True, + "errorText": "Invalid request", + }, + ) + + query = ( + db.query( + Tracks.id.label("id"), + Tracks.artist_name.label("artist"), + Tracks.name.label("song"), + Lyrics.plain_lyrics.label("plainLyrics"), + Lyrics.synced_lyrics.label("syncedLyrics"), + ) + .join(Lyrics, Tracks.id == Lyrics.track_id) + .filter( + and_( + Tracks.artist_name_lower == search_artist, + Tracks.name == search_song, + Tracks.duration == search_duration if search_duration else true(), + ) + ) + ) + + db_result = query.first() + if not db_result: + return JSONResponse( + status_code=404, content={"err": True, "errorText": "No result found."} + ) + + result = LRCLibResult( + id=db_result.id, + artist=db_result.artist, + song=db_result.song, + plainLyrics=db_result.plainLyrics, + syncedLyrics=db_result.syncedLyrics, + ) + + return JSONResponse(content=vars(result)) diff --git a/endpoints/lyric_search.py b/endpoints/lyric_search.py index 062826a..6e75fe0 100644 --- a/endpoints/lyric_search.py +++ b/endpoints/lyric_search.py @@ -212,7 +212,7 @@ class LyricSearch(FastAPI): seeked_found_line: Optional[int] = None # Split lyrics into lines based on
, newline characters, or " / " lyrics_text = result["lyrics"].strip() - + # Determine the delimiter and split accordingly if "
" in lyrics_text: lyric_lines = lyrics_text.split("
") @@ -223,9 +223,9 @@ class LyricSearch(FastAPI): else: lyric_lines = lyrics_text.split("\n") separator = "\n" - + search_term = data.sub.strip().lower() - + # First try single-line matching (existing behavior) for i, line in enumerate(lyric_lines): # Remove any special characters and extra spaces @@ -233,38 +233,46 @@ class LyricSearch(FastAPI): if search_term in cleaned_line.lower(): seeked_found_line = i break - + # If no single-line match found, try multi-line matching if seeked_found_line is None: # Try matching across consecutive lines (up to 5 lines for reasonable performance) max_lines_to_check = min(5, len(lyric_lines)) - + for i in range(len(lyric_lines)): for line_count in range(2, max_lines_to_check + 1): if i + line_count <= len(lyric_lines): # Combine consecutive lines with space separator combined_lines = [] - line_positions: list[tuple[int, int]] = [] # Track where each line starts in combined text + line_positions: list[ + tuple[int, int] + ] = [] # Track where each line starts in combined text combined_text_parts: list[str] = [] - + for j in range(line_count): if i + j < len(lyric_lines): - cleaned_line = regex.sub(r"\u2064", "", lyric_lines[i + j].strip()) + cleaned_line = regex.sub( + r"\u2064", "", lyric_lines[i + j].strip() + ) combined_lines.append(cleaned_line) - + # Track position of this line in the combined text - line_start_pos = len(" ".join(combined_text_parts).lower()) + line_start_pos = len( + " ".join(combined_text_parts).lower() + ) if line_start_pos > 0: - line_start_pos += 1 # Account for space separator + line_start_pos += ( + 1 # Account for space separator + ) line_positions.append((i + j, line_start_pos)) combined_text_parts.append(cleaned_line) - + combined_text = " ".join(combined_lines).lower() - + if search_term in combined_text: # Find which specific line the match starts in match_pos = combined_text.find(search_term) - + # Find the line that contains the start of the match actual_start_line = i # Default fallback for line_idx, line_start_pos in line_positions: @@ -272,10 +280,10 @@ class LyricSearch(FastAPI): actual_start_line = line_idx else: break - + seeked_found_line = actual_start_line break - + if seeked_found_line is not None: break diff --git a/lyric_search/constructors.py b/lyric_search/constructors.py index 73bf626..616c02d 100644 --- a/lyric_search/constructors.py +++ b/lyric_search/constructors.py @@ -1,5 +1,5 @@ from dataclasses import dataclass -from typing import Union +from typing import Union, Optional @dataclass @@ -22,6 +22,25 @@ class LyricsResult: time: float = 0.00 +@dataclass +class LRCLibResult: + """ + Class for returned Lyrics Results + Attributes: + id (int): returned id + artist (str): returned artist + song (str): returned song + plainLyrics (str): returned (plain) lyrics + syncedLyrics (str): returned synchronizedLyrics + """ + + id: int + artist: str + song: str + plainLyrics: Optional[str] = None + syncedLyrics: Optional[str] = None + + """ Generic """ diff --git a/utils/radio_util.py b/utils/radio_util.py index c7ed2a6..4abe163 100644 --- a/utils/radio_util.py +++ b/utils/radio_util.py @@ -486,11 +486,11 @@ class RadioUtil: ) """Loading Complete""" + self.playlists_loaded = True # Request skip from LS to bring streams current for playlist in self.playlists: logging.info("Skipping: %s", playlist) await self._ls_skip(playlist) - self.playlists_loaded = True except Exception as e: logging.info("Playlist load failed: %s", str(e)) traceback.print_exc() diff --git a/utils/rip_background.py b/utils/rip_background.py index 5ea51d0..ac048f3 100644 --- a/utils/rip_background.py +++ b/utils/rip_background.py @@ -51,8 +51,16 @@ logger = logging.getLogger(__name__) async def check_flac_stream(file_path): """Check if the given file contains a FLAC stream using ffprobe.""" cmd = [ - "ffprobe", "-v", "error", "-select_streams", "a:0", "-show_entries", - "stream=codec_name", "-of", "default=noprint_wrappers=1:nokey=1", file_path + "ffprobe", + "-v", + "error", + "-select_streams", + "a:0", + "-show_entries", + "stream=codec_name", + "-of", + "default=noprint_wrappers=1:nokey=1", + file_path, ] process = await asyncio.create_subprocess_exec( *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE @@ -282,263 +290,292 @@ def bulk_download(track_list: list, quality: str = "FLAC"): # Ensure aiohttp session is properly closed async with aiohttp.ClientSession(headers=HEADERS) as session: - print(f"DEBUG: Starting process_tracks with {len(track_list)} tracks") - # Set up a one-time rate-limit callback to notify on the first 429 seen by SRUtil - async def _rate_limit_notify(exc: Exception): - try: - send_log_to_discord( - f"Rate limit observed while fetching metadata: {exc}", - "WARNING", - target, - ) - except Exception: - pass + print(f"DEBUG: Starting process_tracks with {len(track_list)} tracks") - # attach callback and reset notified flag for this job run + # Set up a one-time rate-limit callback to notify on the first 429 seen by SRUtil + async def _rate_limit_notify(exc: Exception): try: - sr.on_rate_limit = _rate_limit_notify - sr._rate_limit_notified = False + send_log_to_discord( + f"Rate limit observed while fetching metadata: {exc}", + "WARNING", + target, + ) except Exception: pass - total = len(track_list or []) - for i, track_id in enumerate(track_list or []): - print(f"DEBUG: Processing track {i+1}/{total}: {track_id}") - track_info = { - "track_id": str(track_id), - "status": "Pending", - "file_path": None, - "error": None, - "attempts": 0, - } - attempt = 0 - while attempt < MAX_RETRIES: - tmp_file = None - attempt += 1 - track_info["attempts"] = attempt + # attach callback and reset notified flag for this job run + try: + sr.on_rate_limit = _rate_limit_notify + sr._rate_limit_notified = False + except Exception: + pass + total = len(track_list or []) + for i, track_id in enumerate(track_list or []): + print(f"DEBUG: Processing track {i + 1}/{total}: {track_id}") + track_info = { + "track_id": str(track_id), + "status": "Pending", + "file_path": None, + "error": None, + "attempts": 0, + } + attempt = 0 + while attempt < MAX_RETRIES: + tmp_file = None + attempt += 1 + track_info["attempts"] = attempt + + try: + print(f"DEBUG: Getting downloadable for track {track_id}") + # Fetch downloadable (handles DASH and others) + downloadable = await sr._safe_api_call( + sr.streamrip_client.get_downloadable, + str(track_id), + 2 if quality == "FLAC" else 1, + retries=3, + ) + + print(f"DEBUG: Got downloadable: {type(downloadable)}") + if not downloadable: + raise RuntimeError("No downloadable created") + + ext = f".{downloadable.extension}" + tmp_file = Path(f"/tmp/{uuid.uuid4().hex}{ext}") + + print(f"DEBUG: Starting download to {tmp_file}") + # Download + print(f"TRACK {track_id}: Starting download") try: - print(f"DEBUG: Getting downloadable for track {track_id}") - # Fetch downloadable (handles DASH and others) - downloadable = await sr._safe_api_call( - sr.streamrip_client.get_downloadable, - str(track_id), - 2 if quality == "FLAC" else 1, - retries=3, + await downloadable._download( + str(tmp_file), callback=lambda x=None: None + ) + print( + f"TRACK {track_id}: Download method completed normally" + ) + except Exception as download_e: + print( + f"TRACK {track_id}: Download threw exception: {download_e}" + ) + raise + + print( + f"DEBUG: Download completed, file exists: {tmp_file.exists()}" + ) + if not tmp_file.exists(): + raise RuntimeError( + f"Download completed but no file created: {tmp_file}" ) - print(f"DEBUG: Got downloadable: {type(downloadable)}") - if not downloadable: - raise RuntimeError("No downloadable created") - - ext = f".{downloadable.extension}" - tmp_file = Path(f"/tmp/{uuid.uuid4().hex}{ext}") - - print(f"DEBUG: Starting download to {tmp_file}") - # Download - print(f"TRACK {track_id}: Starting download") - try: - await downloadable._download(str(tmp_file), callback=lambda x=None: None) - print(f"TRACK {track_id}: Download method completed normally") - except Exception as download_e: - print(f"TRACK {track_id}: Download threw exception: {download_e}") - raise - - print(f"DEBUG: Download completed, file exists: {tmp_file.exists()}") - if not tmp_file.exists(): - raise RuntimeError(f"Download completed but no file created: {tmp_file}") - - print(f"DEBUG: Fetching metadata for track {track_id}") - # Metadata fetch - try: - md = await sr.get_metadata_by_track_id(track_id) or {} - print(f"DEBUG: Metadata fetched: {bool(md)}") - except MetadataFetchError as me: - # Permanent metadata failure — mark failed and break - track_info["status"] = "Failed" - track_info["error"] = str(me) - per_track_meta.append(track_info) - if job: - job.meta["tracks"] = per_track_meta - job.meta["progress"] = int(((i + 1) / total) * 100) - job.save_meta() - break - - artist_raw = md.get("artist") or "Unknown Artist" - album_raw = md.get("album") or "Unknown Album" - title_raw = md.get("title") or f"Track {track_id}" - - artist = sanitize_filename(artist_raw) - album = sanitize_filename(album_raw) - title = sanitize_filename(title_raw) - - print(f"TRACK {track_id}: Processing '{title}' by {artist}") - - all_artists.add(artist) - album_dir = staging_root / artist / album - album_dir.mkdir(parents=True, exist_ok=True) - final_file = ensure_unique_path(album_dir / f"{title}{ext}") - - # Move to final location - print(f"TRACK {track_id}: Moving to final location...") - tmp_file.rename(final_file) - print(f"TRACK {track_id}: File moved successfully") - - # Fetch cover art - try: - album_field = md.get("album") - album_id = md.get("album_id") or ( - album_field.get("id") if isinstance(album_field, dict) else None - ) - except Exception: - album_id = None - - if album_id: - try: - cover_url = await sr.get_cover_by_album_id(album_id, size=640) - except Exception: - cover_url = None - else: - cover_url = md.get("cover_url") - - # Embed tags - embedded = False - img_bytes = None - if cover_url: - try: - timeout = aiohttp.ClientTimeout(total=15) - async with session.get(cover_url, timeout=timeout) as img_resp: - if img_resp.status == 200: - img_bytes = await img_resp.read() - else: - img_bytes = None - try: - send_log_to_discord( - f"Cover download HTTP `{img_resp.status}` for track `{track_id} album_id={album_id} url={cover_url} artist={artist} album={album}`", - "WARNING", - target, - ) - except Exception: - pass - except Exception as e: - img_bytes = None - try: - send_log_to_discord( - f"Cover download exception for track `{track_id} album_id={album_id} url={cover_url} artist={artist} album={album}`: `{e}`", - "WARNING", - target, - ) - except Exception: - pass - - # Try music_tag first - try: - from music_tag import load_file as mt_load_file # type: ignore - - # Add validation for `mf` object - try: - mf = mt_load_file(str(final_file)) - if mf is not None: - if md.get("title"): - mf["title"] = md.get("title") - if md.get("artist"): - mf["artist"] = md.get("artist") - if md.get("album"): - mf["album"] = md.get("album") - tracknum = md.get("track_number") - if tracknum is not None: - try: - mf["tracknumber"] = int(tracknum) - except Exception: - pass - if img_bytes: - mf["artwork"] = img_bytes - mf.save() - embedded = True - else: - logger.error("Failed to load file with music_tag.") - embedded = False - except Exception: - embedded = False - except Exception: - embedded = False - - if not embedded: - try: - if cover_url and not img_bytes: - send_log_to_discord( - f"Cover art not available for track {track_id} album_id={album_id} url={cover_url}", - "WARNING", - target, - ) - except Exception: - pass - try: - tag_with_mediafile(str(final_file), md) - except Exception: - pass - - # Success - tmp_file = None - track_info["status"] = "Success" - track_info["file_path"] = str(final_file) - track_info["error"] = None - all_final_files.append(final_file) - - print(f"TRACK {track_id}: SUCCESS! Progress: {((i + 1) / total) * 100:.0f}%") - + print(f"DEBUG: Fetching metadata for track {track_id}") + # Metadata fetch + try: + md = await sr.get_metadata_by_track_id(track_id) or {} + print(f"DEBUG: Metadata fetched: {bool(md)}") + except MetadataFetchError as me: + # Permanent metadata failure — mark failed and break + track_info["status"] = "Failed" + track_info["error"] = str(me) + per_track_meta.append(track_info) if job: + job.meta["tracks"] = per_track_meta job.meta["progress"] = int(((i + 1) / total) * 100) - job.meta["tracks"] = per_track_meta + [track_info] job.save_meta() break - except aiohttp.ClientResponseError as e: - msg = f"Track {track_id} attempt {attempt} ClientResponseError: {e}" - send_log_to_discord(msg, "WARNING", target) - if getattr(e, "status", None) == 429: - wait_time = min(60, 2 ** attempt) - await asyncio.sleep(wait_time) - else: - await asyncio.sleep(random.uniform(THROTTLE_MIN, THROTTLE_MAX)) + artist_raw = md.get("artist") or "Unknown Artist" + album_raw = md.get("album") or "Unknown Album" + title_raw = md.get("title") or f"Track {track_id}" - except Exception as e: - tb = traceback.format_exc() - is_no_stream_url = isinstance(e, RuntimeError) and str(e) == "No stream URL" - if is_no_stream_url: - if attempt == 1 or attempt == MAX_RETRIES: - msg = f"Track {track_id} attempt {attempt} failed: {e}\n{tb}" - send_log_to_discord(msg, "ERROR", target) - track_info["error"] = str(e) - if attempt >= MAX_RETRIES: - track_info["status"] = "Failed" - send_log_to_discord( - f"Track {track_id} failed after {attempt} attempts", - "ERROR", - target, - ) - await asyncio.sleep(random.uniform(THROTTLE_MIN, THROTTLE_MAX)) - else: - msg = f"Track {track_id} attempt {attempt} failed: {e}\n{tb}" - send_log_to_discord(msg, "ERROR", target) - track_info["error"] = str(e) - if attempt >= MAX_RETRIES: - track_info["status"] = "Failed" - send_log_to_discord( - f"Track {track_id} failed after {attempt} attempts", - "ERROR", - target, - ) - await asyncio.sleep(random.uniform(THROTTLE_MIN, THROTTLE_MAX)) + artist = sanitize_filename(artist_raw) + album = sanitize_filename(album_raw) + title = sanitize_filename(title_raw) - finally: + print(f"TRACK {track_id}: Processing '{title}' by {artist}") + + all_artists.add(artist) + album_dir = staging_root / artist / album + album_dir.mkdir(parents=True, exist_ok=True) + final_file = ensure_unique_path(album_dir / f"{title}{ext}") + + # Move to final location + print(f"TRACK {track_id}: Moving to final location...") + tmp_file.rename(final_file) + print(f"TRACK {track_id}: File moved successfully") + + # Fetch cover art + try: + album_field = md.get("album") + album_id = md.get("album_id") or ( + album_field.get("id") + if isinstance(album_field, dict) + else None + ) + except Exception: + album_id = None + + if album_id: try: - if tmp_file and tmp_file.exists(): - os.remove(tmp_file) + cover_url = await sr.get_cover_by_album_id( + album_id, size=640 + ) + except Exception: + cover_url = None + else: + cover_url = md.get("cover_url") + + # Embed tags + embedded = False + img_bytes = None + if cover_url: + try: + timeout = aiohttp.ClientTimeout(total=15) + async with session.get( + cover_url, timeout=timeout + ) as img_resp: + if img_resp.status == 200: + img_bytes = await img_resp.read() + else: + img_bytes = None + try: + send_log_to_discord( + f"Cover download HTTP `{img_resp.status}` for track `{track_id} album_id={album_id} url={cover_url} artist={artist} album={album}`", + "WARNING", + target, + ) + except Exception: + pass + except Exception as e: + img_bytes = None + try: + send_log_to_discord( + f"Cover download exception for track `{track_id} album_id={album_id} url={cover_url} artist={artist} album={album}`: `{e}`", + "WARNING", + target, + ) + except Exception: + pass + + # Try music_tag first + try: + from music_tag import load_file as mt_load_file # type: ignore + + # Add validation for `mf` object + try: + mf = mt_load_file(str(final_file)) + if mf is not None: + if md.get("title"): + mf["title"] = md.get("title") + if md.get("artist"): + mf["artist"] = md.get("artist") + if md.get("album"): + mf["album"] = md.get("album") + tracknum = md.get("track_number") + if tracknum is not None: + try: + mf["tracknumber"] = int(tracknum) + except Exception: + pass + if img_bytes: + mf["artwork"] = img_bytes + mf.save() + embedded = True + else: + logger.error("Failed to load file with music_tag.") + embedded = False + except Exception: + embedded = False + except Exception: + embedded = False + + if not embedded: + try: + if cover_url and not img_bytes: + send_log_to_discord( + f"Cover art not available for track {track_id} album_id={album_id} url={cover_url}", + "WARNING", + target, + ) + except Exception: + pass + try: + tag_with_mediafile(str(final_file), md) except Exception: pass - per_track_meta.append(track_info) + # Success + tmp_file = None + track_info["status"] = "Success" + track_info["file_path"] = str(final_file) + track_info["error"] = None + all_final_files.append(final_file) + + print( + f"TRACK {track_id}: SUCCESS! Progress: {((i + 1) / total) * 100:.0f}%" + ) + + if job: + job.meta["progress"] = int(((i + 1) / total) * 100) + job.meta["tracks"] = per_track_meta + [track_info] + job.save_meta() + break + + except aiohttp.ClientResponseError as e: + msg = f"Track {track_id} attempt {attempt} ClientResponseError: {e}" + send_log_to_discord(msg, "WARNING", target) + if getattr(e, "status", None) == 429: + wait_time = min(60, 2**attempt) + await asyncio.sleep(wait_time) + else: + await asyncio.sleep( + random.uniform(THROTTLE_MIN, THROTTLE_MAX) + ) + + except Exception as e: + tb = traceback.format_exc() + is_no_stream_url = ( + isinstance(e, RuntimeError) and str(e) == "No stream URL" + ) + if is_no_stream_url: + if attempt == 1 or attempt == MAX_RETRIES: + msg = f"Track {track_id} attempt {attempt} failed: {e}\n{tb}" + send_log_to_discord(msg, "ERROR", target) + track_info["error"] = str(e) + if attempt >= MAX_RETRIES: + track_info["status"] = "Failed" + send_log_to_discord( + f"Track {track_id} failed after {attempt} attempts", + "ERROR", + target, + ) + await asyncio.sleep( + random.uniform(THROTTLE_MIN, THROTTLE_MAX) + ) + else: + msg = ( + f"Track {track_id} attempt {attempt} failed: {e}\n{tb}" + ) + send_log_to_discord(msg, "ERROR", target) + track_info["error"] = str(e) + if attempt >= MAX_RETRIES: + track_info["status"] = "Failed" + send_log_to_discord( + f"Track {track_id} failed after {attempt} attempts", + "ERROR", + target, + ) + await asyncio.sleep( + random.uniform(THROTTLE_MIN, THROTTLE_MAX) + ) + + finally: + try: + if tmp_file and tmp_file.exists(): + os.remove(tmp_file) + except Exception: + pass + + per_track_meta.append(track_info) if not all_final_files: if job: @@ -690,6 +727,7 @@ def bulk_download(track_list: list, quality: str = "FLAC"): finally: loop.close() + # Correct integration of FLAC stream check async def process_tracks(track_list): for i, track_id in enumerate(track_list or []): diff --git a/utils/sr_wrapper.py b/utils/sr_wrapper.py index 8fe7680..cf6b36e 100644 --- a/utils/sr_wrapper.py +++ b/utils/sr_wrapper.py @@ -19,15 +19,21 @@ class MetadataFetchError(Exception): """Raised when metadata fetch permanently fails after retries.""" -# Suppress all logging output from this module and its children -for name in [__name__, "utils.sr_wrapper"]: +# Suppress noisy logging from this module and from the `streamrip` library +# We set propagate=False so messages don't bubble up to the root logger and +# attach a NullHandler where appropriate to avoid "No handler found" warnings. +for name in [__name__, "utils.sr_wrapper", "streamrip", "streamrip.client"]: logger = logging.getLogger(name) - logger.setLevel(logging.INFO) # Temporarily set to INFO for debugging LRC + # Keep default level (or raise to WARNING) so non-important logs are dropped + try: + logger.setLevel(logging.WARNING) + except Exception: + pass logger.propagate = False - for handler in logger.handlers: - handler.setLevel(logging.INFO) -# Also set the root logger to CRITICAL as a last resort (may affect global logging) -# logging.getLogger().setLevel(logging.CRITICAL) + # Ensure a NullHandler is present so logs don't propagate and no missing-handler + # warnings are printed when the package emits records. + if not any(isinstance(h, logging.NullHandler) for h in logger.handlers): + logger.addHandler(logging.NullHandler()) load_dotenv() @@ -684,21 +690,22 @@ class SRUtil: except Exception as e: # Exponential backoff with jitter for 429 or other errors delay = self.RETRY_DELAY * (2 ** (attempt - 1)) + random.uniform(0, 0.5) - logging.warning( - "Metadata fetch failed for track %s (attempt %d/%d): %s. Retrying in %.2fs", - track_id, - attempt, - self.MAX_METADATA_RETRIES, - str(e), - delay, - ) if attempt < self.MAX_METADATA_RETRIES: + logging.warning( + "Retrying metadata fetch for track %s (attempt %d/%d): %s. Next retry in %.2fs", + track_id, + attempt, + self.MAX_METADATA_RETRIES, + str(e), + delay, + ) await asyncio.sleep(delay) else: logging.error( - "Metadata fetch failed permanently for track %s after %d attempts", + "Metadata fetch failed permanently for track %s after %d attempts: %s", track_id, self.MAX_METADATA_RETRIES, + str(e), ) # Raise a specific exception so callers can react (e.g. notify) raise MetadataFetchError(