import logging import json import os import time import aiohttp import asyncio import traceback from datetime import datetime from fastapi import FastAPI, Depends, HTTPException, Request from fastapi_throttle import RateLimiter from fastapi.responses import JSONResponse import redis from lyric_search.sources import private from auth.deps import get_current_user from dotenv import load_dotenv from pycync.user import User # type: ignore from pycync.cync import Cync as Cync # type: ignore from pycync import Auth # type: ignore from pycync.exceptions import TwoFactorRequiredError, AuthFailedError # type: ignore import inspect import getpass from typing import Optional # Configure logging to write to a file for specific events logging.basicConfig( filename="cync_auth_events.log", level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s", ) def _mask_token(token: Optional[str]) -> str: """Mask sensitive token data for logging, showing only first/last 4 chars.""" if not token or len(token) < 8: return "" return f"{token[:4]}...{token[-4:]}" def _log_token_state(user, context: str): """Log masked token state for debugging.""" if not user: logging.info(f"{context} - No user object available") return try: logging.info( f"{context} - Token state: access=%s refresh=%s expires_at=%s", _mask_token(getattr(user, "access_token", None)), _mask_token(getattr(user, "refresh_token", None)), getattr(user, "expires_at", None), ) except Exception as e: logging.error(f"Error logging token state: {e}") class Lighting(FastAPI): async def _close_session_safely(self): """Safely close the current session if it exists.""" if self.session and not getattr(self.session, "closed", True): try: await self.session.close() # Wait a bit for the underlying connections to close await asyncio.sleep(0.1) except Exception as e: logging.warning(f"Error closing session: {e}") self.session = None self.auth = None self.cync_api = None async def _test_connection_health(self) -> bool: """Test if the current connection is healthy by making a simple API call.""" if ( not self.cync_api or not self.session or getattr(self.session, "closed", True) ): return False try: # Make a simple API call to test connectivity devices = self.cync_api.get_devices() # Just check if we get a response without errors return devices is not None except Exception as e: logging.warning(f"Connection health check failed: {e}") return False async def ensure_cync_connection(self, force_reconnect: bool = False): """Ensure aiohttp session and Cync API are alive, re-create if needed.""" # Check required environment variables missing_vars = [] if not self.cync_email: missing_vars.append("CYNC_EMAIL") if not self.cync_password: missing_vars.append("CYNC_PASSWORD") if not self.cync_device_name: missing_vars.append("CYNC_DEVICE_NAME") if missing_vars: raise Exception( f"Missing required environment variables: {', '.join(missing_vars)}" ) # Cast to str after check to silence linter cync_email: str = self.cync_email # type: ignore cync_password: str = self.cync_password # type: ignore # If force_reconnect is True or connection is unhealthy, rebuild everything if force_reconnect or not await self._test_connection_health(): logging.info( "Connection unhealthy or force reconnect requested. Rebuilding connection..." ) # Clean up existing connection await self._close_session_safely() # Create new session with timeout configuration timeout = aiohttp.ClientTimeout(total=30, connect=10) connector = aiohttp.TCPConnector( limit=100, limit_per_host=30, ttl_dns_cache=300, use_dns_cache=True, keepalive_timeout=60, enable_cleanup_closed=True, ) self.session = aiohttp.ClientSession(timeout=timeout, connector=connector) # Load cached token and check validity self.cync_user = None cached_user = self._load_cached_user() token_status = None if cached_user and hasattr(cached_user, "expires_at"): # Add buffer time - consider token expired if less than 5 minutes remaining buffer_time = 300 # 5 minutes if cached_user.expires_at > (time.time() + buffer_time): token_status = "valid" else: token_status = "expired" else: token_status = "no cached user or missing expires_at" logging.info(f"Cync token status: {token_status}") if token_status == "valid" and cached_user is not None: # Use cached token self.auth = Auth( session=self.session, user=cached_user, username=cync_email, password=cync_password, ) self.cync_user = cached_user logging.info("Reusing valid cached token, no 2FA required.") else: # Need fresh login - clear any cached user that's expired if token_status == "expired": try: os.remove(self.token_cache_path) logging.info("Removed expired token cache") except (OSError, FileNotFoundError): pass logging.info("Initializing new Auth instance...") self.auth = Auth( session=self.session, username=cync_email, password=cync_password, ) try: logging.info("Attempting fresh login...") self.cync_user = await self.auth.login() _log_token_state(self.cync_user, "After fresh login") self._save_cached_user(self.cync_user) logging.info("Fresh login successful") except TwoFactorRequiredError: twofa_code = os.getenv("CYNC_2FA_CODE") if not twofa_code: print("Cync 2FA required. Please enter your code:") twofa_code = getpass.getpass("2FA Code: ") if twofa_code: logging.info("Retrying Cync login with 2FA code.") try: self.cync_user = await self.auth.login( two_factor_code=twofa_code ) self._save_cached_user(self.cync_user) logging.info("Logged in with 2FA successfully.") except Exception as e: logging.error("Cync 2FA login failed: %s", e) logging.info( "2FA failure details: Code=%s, User=%s", twofa_code, self.cync_user, ) raise Exception("Cync 2FA code invalid or not accepted.") else: logging.error("Cync 2FA required but no code provided.") raise Exception("Cync 2FA required.") except AuthFailedError as e: logging.error("Failed to authenticate with Cync API: %s", e) raise Exception("Cync authentication failed.") # Create new Cync API instance try: logging.info("Creating Cync API instance...") _log_token_state(self.auth.user, "Before Cync.create") # Check if token needs refresh before API creation now = time.time() expires_at = getattr(self.auth.user, "expires_at", 0) time_until_expiry = expires_at - now logging.info( f"Token expires in {int(time_until_expiry / 60)} minutes (at {datetime.fromtimestamp(expires_at).isoformat()})" ) # Always try refresh if we're reusing a cached token if token_status == "valid": logging.info("Testing cached token with refresh attempt") try: refresh = getattr(self.auth, "async_refresh_user_token", None) if callable(refresh): # Log session state before refresh if hasattr(self.session, "cookie_jar"): try: cookie_count = len(self.session.cookie_jar) logging.info( f"Session has {cookie_count} cookies before refresh" ) except Exception as cookie_e: logging.warning( f"Could not check cookies: {cookie_e}" ) result = refresh() if inspect.isawaitable(result): try: await result logging.info("Token refresh test succeeded") except AuthFailedError: logging.warning( "Cached token rejected by server despite being valid locally" ) # Clear cached token and force fresh login try: os.remove(self.token_cache_path) logging.info("Cleared rejected token cache") except (OSError, FileNotFoundError): pass logging.info( "Attempting fresh login after refresh rejection..." ) self.auth = Auth( session=self.session, username=cync_email, password=cync_password, ) self.cync_user = await self.auth.login() self._save_cached_user(self.cync_user) logging.info( "Fresh login successful after refresh rejection" ) else: logging.warning( "Refresh method returned non-awaitable result" ) except Exception as refresh_e: logging.error(f"Pre-API refresh failed: {refresh_e}") logging.error( "Refresh error traceback:\n%s", traceback.format_exc() ) self.cync_api = await Cync.create(self.auth) logging.info("Cync API connection established successfully") except Exception as e: logging.error("Failed to create Cync API instance") logging.error("Exception details: %s", str(e)) logging.error("Traceback:\n%s", traceback.format_exc()) # Save diagnostic info diagnostic_data = { "timestamp": datetime.now().isoformat(), "error_type": type(e).__name__, "error_message": str(e), "auth_state": { "has_auth": bool(self.auth), "has_user": bool(getattr(self.auth, "user", None)), "user_state": { "access_token": _mask_token( getattr(self.auth.user, "access_token", None) ) if self.auth and self.auth.user else None, "refresh_token": _mask_token( getattr(self.auth.user, "refresh_token", None) ) if self.auth and self.auth.user else None, "expires_at": getattr(self.auth.user, "expires_at", None) if self.auth and self.auth.user else None, "time_until_expiry_minutes": int( (getattr(self.auth.user, "expires_at", 0) - time.time()) / 60 ) if self.auth and self.auth.user else None, "refresh_method_exists": hasattr( self.auth, "async_refresh_user_token" ) if self.auth else False, "refresh_method_callable": callable( getattr(self.auth, "async_refresh_user_token", None) ) if self.auth else False, } if self.auth and self.auth.user else None, }, } diagnostic_file = f"cync_api_failure-{int(time.time())}.json" try: with open(diagnostic_file, "w") as f: json.dump(diagnostic_data, f, indent=2) logging.info( f"Saved API creation diagnostic data to {diagnostic_file}" ) except Exception as save_error: logging.error(f"Failed to save diagnostic data: {save_error}") raise # Final validation if ( not self.cync_api or not self.session or getattr(self.session, "closed", True) ): logging.error("Connection validation failed after setup") _log_token_state( getattr(self.auth, "user", None), "Failed connection validation" ) raise Exception("Failed to establish proper Cync connection") """ Lighting Endpoints """ def __init__(self, app: FastAPI, util, constants) -> None: """Initialize Lighting endpoints and persistent Cync connection.""" load_dotenv() self.app: FastAPI = app self.util = util self.constants = constants self.redis_client = redis.Redis( password=private.REDIS_PW, decode_responses=True ) self.lighting_key = "lighting:state" # Cync config self.cync_email = os.getenv("CYNC_EMAIL") self.cync_password = os.getenv("CYNC_PASSWORD") self.cync_device_name = os.getenv("CYNC_DEVICE_NAME") self.token_cache_path = "cync_token.json" self.session = None self.auth = None self.cync_user = None self.cync_api = None self.health_check_task: Optional[asyncio.Task] = None # Set up Cync connection at startup using FastAPI event @app.on_event("startup") async def startup_event(): # Check required environment variables missing_vars = [] if not self.cync_email: missing_vars.append("CYNC_EMAIL") if not self.cync_password: missing_vars.append("CYNC_PASSWORD") if not self.cync_device_name: missing_vars.append("CYNC_DEVICE_NAME") if missing_vars: raise Exception( f"Missing required environment variables: {', '.join(missing_vars)}" ) # Use ensure_cync_connection which has proper token caching try: await self.ensure_cync_connection() logging.info("Cync lighting system initialized successfully") except Exception as e: logging.error(f"Failed to initialize Cync connection at startup: {e}") # Don't raise - allow server to start, connection will be retried on first request # Schedule periodic token validation and connection health checks self.health_check_task = asyncio.create_task(self._schedule_health_checks()) @app.on_event("shutdown") async def shutdown_event(): # Cancel health check task if self.health_check_task and not self.health_check_task.done(): self.health_check_task.cancel() try: await self.health_check_task except asyncio.CancelledError: logging.info("Health check task cancelled successfully") pass # Clean up connections await self._close_session_safely() logging.info("Cync lighting system shut down cleanly") # Register endpoints self.endpoints: dict = { "lighting/state": self.get_lighting_state, } for endpoint, handler in self.endpoints.items(): self.app.add_api_route( f"/{endpoint}", handler, methods=["GET"], include_in_schema=True, dependencies=[ Depends(RateLimiter(times=25, seconds=2)), Depends(get_current_user), ], ) self.app.add_api_route( "/lighting/state", self.set_lighting_state, methods=["POST"], include_in_schema=True, dependencies=[ Depends(RateLimiter(times=25, seconds=2)), Depends(get_current_user), ], ) async def _refresh_or_login(self): if not self.auth: logging.error("Auth object is not initialized.") raise Exception("Cync authentication not initialized.") try: user = getattr(self.auth, "user", None) _log_token_state(user, "Before refresh attempt") if user and hasattr(user, "expires_at") and user.expires_at > time.time(): refresh = getattr(self.auth, "async_refresh_user_token", None) if callable(refresh): try: logging.info("Attempting token refresh...") result = refresh() if inspect.isawaitable(result): await result logging.info( "Token refresh completed successfully (awaited)" ) else: logging.info("Token refresh completed (non-awaitable)") except AuthFailedError as e: logging.error("Token refresh failed with AuthFailedError") logging.error("Exception details: %s", str(e)) logging.error("Traceback:\n%s", traceback.format_exc()) # Save diagnostic info to file diagnostic_data = { "timestamp": datetime.now().isoformat(), "error_type": "AuthFailedError", "error_message": str(e), "user_state": { "access_token": _mask_token( getattr(user, "access_token", None) ), "refresh_token": _mask_token( getattr(user, "refresh_token", None) ), "expires_at": getattr(user, "expires_at", None), }, } try: diagnostic_file = ( f"cync_auth_failure-{int(time.time())}.json" ) with open(diagnostic_file, "w") as f: json.dump(diagnostic_data, f, indent=2) logging.info(f"Saved diagnostic data to {diagnostic_file}") except Exception as save_error: logging.error( f"Failed to save diagnostic data: {save_error}" ) raise login = getattr(self.auth, "login", None) if callable(login): try: result = login() if inspect.isawaitable(result): self.cync_user = await result else: self.cync_user = result self._save_cached_user(self.cync_user) logging.info("Logged in successfully.") except TwoFactorRequiredError: twofa_code = os.getenv("CYNC_2FA_CODE") if not twofa_code: # Prompt interactively if not set print("Cync 2FA required. Please enter your code:") twofa_code = getpass.getpass("2FA Code: ") if twofa_code: logging.info("Retrying Cync login with 2FA code.") try: result = login(two_factor_code=twofa_code) if inspect.isawaitable(result): self.cync_user = await result else: self.cync_user = result self._save_cached_user(self.cync_user) logging.info("Logged in with 2FA successfully.") except Exception as e: logging.error("Cync 2FA login failed: %s", e) logging.info( "2FA failure details: Code=%s, User=%s", twofa_code, self.cync_user, ) raise Exception("Cync 2FA code invalid or not accepted.") else: logging.error("Cync 2FA required but no code provided.") raise Exception("Cync 2FA required.") else: raise Exception("Auth object missing login method.") except AuthFailedError as e: logging.error("Failed to authenticate with Cync API: %s", e) raise Exception("Cync authentication failed.") except Exception as e: logging.error("Unexpected error during authentication: %s", e) raise async def _schedule_health_checks(self): """Periodic health checks and token validation.""" while True: try: await asyncio.sleep(300) # Check every 5 minutes # Check token expiration (refresh if less than 10 minutes left) if self.cync_user and hasattr(self.cync_user, "expires_at"): expires_at = getattr(self.cync_user, "expires_at", 0) time_until_expiry = expires_at - time.time() if time_until_expiry < 600: # Less than 10 minutes logging.info( f"Token expires in {int(time_until_expiry / 60)} minutes. Refreshing..." ) try: await self._refresh_or_login() except Exception as e: logging.error( f"Token refresh failed during health check: {e}" ) # Test connection health if not await self._test_connection_health(): logging.warning( "Connection health check failed. Will reconnect on next API call." ) except asyncio.CancelledError: logging.info("Health check task cancelled") break except Exception as e: logging.error(f"Error during periodic health check: {e}") # Continue the loop even on errors def _load_cached_user(self): try: if os.path.exists(self.token_cache_path): with open(self.token_cache_path, "r") as f: data = json.load(f) return User( access_token=data["access_token"], refresh_token=data["refresh_token"], authorize=data["authorize"], user_id=data["user_id"], expires_at=data["expires_at"], ) except Exception as e: logging.warning("Failed to load cached Cync user: %s", e) return None def _save_cached_user(self, user): try: data = { "access_token": user.access_token, "refresh_token": user.refresh_token, "authorize": user.authorize, "user_id": user.user_id, "expires_at": user.expires_at, } with open(self.token_cache_path, "w") as f: json.dump(data, f) logging.info("Saved Cync user tokens to disk.") except Exception as e: logging.warning("Failed to save Cync user tokens: %s", e) async def get_lighting_state(self) -> JSONResponse: """ Get the current lighting state. Returns: - **JSONResponse**: Contains the current lighting state. """ try: state = self.redis_client.get(self.lighting_key) if state: return JSONResponse(content=json.loads(str(state))) else: # Default state default_state = { "power": "off", "brightness": 50, "color": {"r": 255, "g": 255, "b": 255}, } return JSONResponse(content=default_state) except Exception as e: logging.error("Error getting lighting state: %s", e) raise HTTPException(status_code=500, detail="Internal server error") async def set_lighting_state(self, request: Request) -> JSONResponse: """ Set the lighting state and apply it to the Cync device. """ logging.info("=== LIGHTING STATE REQUEST RECEIVED ===") try: state = await request.json() logging.info(f"Requested state: {state}") # Validate state (basic validation) if not isinstance(state, dict): raise HTTPException( status_code=400, detail="State must be a JSON object" ) # Store in Redis self.redis_client.set(self.lighting_key, json.dumps(state)) await self.ensure_cync_connection() # Validate and extract state values power = state.get("power", "off") if power not in ["on", "off"]: raise HTTPException( status_code=400, detail=f"Invalid power state: {power}" ) brightness = state.get("brightness", 50) if not isinstance(brightness, (int, float)) or not (0 <= brightness <= 100): raise HTTPException( status_code=400, detail=f"Invalid brightness: {brightness}" ) color = state.get("color") if ( color and isinstance(color, dict) and all(k in color for k in ["r", "g", "b"]) ): rgb = (color["r"], color["g"], color["b"]) elif all(k in state for k in ["red", "green", "blue"]): rgb = (state["red"], state["green"], state["blue"]) for val, name in zip(rgb, ["red", "green", "blue"]): if not isinstance(val, int) or not (0 <= val <= 255): raise HTTPException( status_code=400, detail=f"Invalid {name} color value: {val}" ) else: rgb = None # Apply to Cync device with robust retry and error handling max_retries = 3 last_exception: Exception = Exception("No attempts made") for attempt in range(max_retries): try: # Ensure connection before each attempt force_reconnect = attempt > 0 # Force reconnect on retries await self.ensure_cync_connection(force_reconnect=force_reconnect) if not self.cync_api: raise Exception("Cync API not available after connection setup") logging.info( f"Attempt {attempt + 1}/{max_retries}: Getting devices from Cync API..." ) devices = self.cync_api.get_devices() if not devices: raise Exception("No devices returned from Cync API") logging.info( f"Devices returned: {[getattr(d, 'name', 'unnamed') for d in devices]}" ) light = next( ( d for d in devices if hasattr(d, "name") and d.name == self.cync_device_name ), None, ) if not light: available_devices = [ getattr(d, "name", "unnamed") for d in devices ] raise Exception( f"Device '{self.cync_device_name}' not found. Available devices: {available_devices}" ) logging.info( f"Selected device: {getattr(light, 'name', 'unnamed')}" ) # Execute device operations operations_completed = [] # Set power if power == "on": result = await light.turn_on() operations_completed.append(f"turn_on: {result}") else: result = await light.turn_off() operations_completed.append(f"turn_off: {result}") # Set brightness if "brightness" in state: result = await light.set_brightness(brightness) operations_completed.append( f"set_brightness({brightness}): {result}" ) # Set color if rgb: result = await light.set_rgb(rgb) operations_completed.append(f"set_rgb({rgb}): {result}") logging.info( f"All operations completed successfully: {operations_completed}" ) break # Success, exit retry loop except ( aiohttp.ClientConnectionError, aiohttp.ClientOSError, aiohttp.ServerDisconnectedError, aiohttp.ClientConnectorError, ConnectionResetError, ConnectionError, OSError, asyncio.TimeoutError, ) as e: last_exception = e logging.warning( f"Connection/network error (attempt {attempt + 1}/{max_retries}): {type(e).__name__}: {e}" ) if attempt < max_retries - 1: # Wait a bit before retry to allow network/server recovery await asyncio.sleep( 2**attempt ) # Exponential backoff: 1s, 2s, 4s continue except (AuthFailedError, TwoFactorRequiredError) as e: last_exception = e logging.error( f"Authentication error (attempt {attempt + 1}/{max_retries}): {e}" ) if attempt < max_retries - 1: # Clear cached tokens on auth errors try: os.remove(self.token_cache_path) logging.info("Cleared token cache due to auth error") except (OSError, FileNotFoundError): pass await asyncio.sleep(1) continue except Exception as e: last_exception = e error_msg = f"Unexpected error (attempt {attempt + 1}/{max_retries}): {type(e).__name__}: {e}" logging.error(error_msg) # On unexpected errors, try reconnecting for next attempt if attempt < max_retries - 1: logging.warning( "Forcing full reconnection due to unexpected error..." ) await asyncio.sleep(1) continue # If we get here, all retries failed logging.error( f"All {max_retries} attempts failed. Last error: {type(last_exception).__name__}: {last_exception}" ) raise last_exception logging.info( "Successfully applied state to device '%s': %s", self.cync_device_name, state, ) return JSONResponse( content={ "message": "Lighting state updated and applied", "state": state, } ) except HTTPException: raise except Exception as e: logging.error("Error setting lighting state: %s", e) raise HTTPException(status_code=500, detail="Internal server error")