import logging import json import os import time import aiohttp import asyncio from fastapi import FastAPI, Depends, HTTPException, Request from fastapi_throttle import RateLimiter from fastapi.responses import JSONResponse import redis from lyric_search.sources import private from auth.deps import get_current_user from dotenv import load_dotenv from pycync.user import User # type: ignore from pycync.cync import Cync as Cync # type: ignore from pycync import Auth # type: ignore from pycync.exceptions import TwoFactorRequiredError, AuthFailedError # type: ignore import inspect import getpass class Lighting(FastAPI): async def ensure_cync_connection(self): """Ensure aiohttp session and Cync API are alive, re-create if needed.""" # Check required environment variables missing_vars = [] if not self.cync_email: missing_vars.append("CYNC_EMAIL") if not self.cync_password: missing_vars.append("CYNC_PASSWORD") if not self.cync_device_name: missing_vars.append("CYNC_DEVICE_NAME") if missing_vars: raise Exception( f"Missing required environment variables: {', '.join(missing_vars)}" ) # Cast to str after check to silence linter cync_email: str = self.cync_email # type: ignore cync_password: str = self.cync_password # type: ignore # Check if session is closed or missing if not self.session or getattr(self.session, "closed", False): self.session = aiohttp.ClientSession() # Load cached token and check validity self.cync_user = None cached_user = self._load_cached_user() token_status = None if cached_user: if hasattr(cached_user, "expires_at"): if cached_user.expires_at > time.time(): token_status = "valid" else: token_status = "expired" else: token_status = "missing expires_at" else: token_status = "no cached user" logging.info(f"Cync token status: {token_status}") if token_status == "valid" and cached_user is not None: # Use cached token self.auth = Auth( session=self.session, user=cached_user, username=cync_email, password=cync_password, ) self.cync_user = cached_user logging.info("Reusing valid cached token, no 2FA required.") else: # Need fresh login self.auth = Auth( session=self.session, username=cync_email, password=cync_password, ) try: self.cync_user = await self.auth.login() self._save_cached_user(self.cync_user) except TwoFactorRequiredError: twofa_code = os.getenv("CYNC_2FA_CODE") if not twofa_code: print("Cync 2FA required. Please enter your code:") twofa_code = getpass.getpass("2FA Code: ") if twofa_code: logging.info("Retrying Cync login with 2FA code.") try: self.cync_user = await self.auth.login(two_factor_code=twofa_code) self._save_cached_user(self.cync_user) logging.info("Logged in with 2FA successfully.") except Exception as e: logging.error("Cync 2FA login failed: %s", e) raise Exception("Cync 2FA code invalid or not accepted.") else: logging.error("Cync 2FA required but no code provided.") raise Exception("Cync 2FA required.") except AuthFailedError as e: logging.error("Failed to authenticate with Cync API: %s", e) raise Exception("Cync authentication failed.") self.cync_api = await Cync.create(self.auth) # Also check if cync_api is None (shouldn't happen, but just in case) if not self.cync_api: if not self.auth: logging.critical("self.auth: %s", self.auth) return self.cync_api = await Cync.create(self.auth) """ Lighting Endpoints """ def __init__(self, app: FastAPI, util, constants) -> None: """Initialize Lighting endpoints and persistent Cync connection.""" load_dotenv() self.app: FastAPI = app self.util = util self.constants = constants self.redis_client = redis.Redis( password=private.REDIS_PW, decode_responses=True ) self.lighting_key = "lighting:state" # Cync config self.cync_email = os.getenv("CYNC_EMAIL") self.cync_password = os.getenv("CYNC_PASSWORD") self.cync_device_name = os.getenv("CYNC_DEVICE_NAME") self.token_cache_path = "cync_token.json" self.session = None self.auth = None self.cync_user = None self.cync_api = None # Set up Cync connection at startup using FastAPI event @app.on_event("startup") async def startup_event(): # Check required environment variables missing_vars = [] if not self.cync_email: missing_vars.append("CYNC_EMAIL") if not self.cync_password: missing_vars.append("CYNC_PASSWORD") if not self.cync_device_name: missing_vars.append("CYNC_DEVICE_NAME") if missing_vars: raise Exception( f"Missing required environment variables: {', '.join(missing_vars)}" ) # Use ensure_cync_connection which has proper token caching await self.ensure_cync_connection() # Create persistent Cync API object if self.auth: self.cync_api = await Cync.create(self.auth) # Schedule periodic token validation asyncio.create_task(self._schedule_token_validation()) # Register endpoints self.endpoints: dict = { "lighting/state": self.get_lighting_state, } for endpoint, handler in self.endpoints.items(): self.app.add_api_route( f"/{endpoint}", handler, methods=["GET"], include_in_schema=True, dependencies=[ Depends(RateLimiter(times=25, seconds=2)), Depends(get_current_user), ], ) self.app.add_api_route( "/lighting/state", self.set_lighting_state, methods=["POST"], include_in_schema=True, dependencies=[ Depends(RateLimiter(times=25, seconds=2)), Depends(get_current_user), ], ) async def _refresh_or_login(self): if not self.auth: logging.error("Auth object is not initialized.") raise Exception("Cync authentication not initialized.") try: user = getattr(self.auth, 'user', None) if user and hasattr(user, "expires_at") and user.expires_at > time.time(): refresh = getattr(self.auth, 'async_refresh_user_token', None) if callable(refresh): try: result = refresh() if inspect.isawaitable(result): await result else: pass # do nothing if not awaitable except AuthFailedError as e: logging.warning("Token refresh failed: %s", e) login = getattr(self.auth, 'login', None) if callable(login): try: result = login() if inspect.isawaitable(result): self.cync_user = await result else: self.cync_user = result self._save_cached_user(self.cync_user) logging.info("Logged in successfully.") except TwoFactorRequiredError: twofa_code = os.getenv("CYNC_2FA_CODE") if not twofa_code: # Prompt interactively if not set print("Cync 2FA required. Please enter your code:") twofa_code = getpass.getpass("2FA Code: ") if twofa_code: logging.info("Retrying Cync login with 2FA code.") try: result = login(two_factor_code=twofa_code) if inspect.isawaitable(result): self.cync_user = await result else: self.cync_user = result self._save_cached_user(self.cync_user) logging.info("Logged in with 2FA successfully.") except Exception as e: logging.error("Cync 2FA login failed: %s", e) raise Exception("Cync 2FA code invalid or not accepted.") else: logging.error("Cync 2FA required but no code provided.") raise Exception("Cync 2FA required.") else: raise Exception("Auth object missing login method.") except AuthFailedError as e: logging.error("Failed to authenticate with Cync API: %s", e) raise Exception("Cync authentication failed.") except Exception as e: logging.error("Unexpected error during authentication: %s", e) raise async def _schedule_token_validation(self): while True: try: await asyncio.sleep(300) user = getattr(self.auth, 'user', None) if user and hasattr(user, "expires_at") and user.expires_at - time.time() < 600: logging.info("Token is about to expire. Refreshing...") await self._refresh_or_login() except Exception as e: logging.error("Error during periodic token validation: %s", e) def _load_cached_user(self): try: if os.path.exists(self.token_cache_path): with open(self.token_cache_path, "r") as f: data = json.load(f) return User( access_token=data["access_token"], refresh_token=data["refresh_token"], authorize=data["authorize"], user_id=data["user_id"], expires_at=data["expires_at"], ) except Exception as e: logging.warning("Failed to load cached Cync user: %s", e) return None def _save_cached_user(self, user): try: data = { "access_token": user.access_token, "refresh_token": user.refresh_token, "authorize": user.authorize, "user_id": user.user_id, "expires_at": user.expires_at, } with open(self.token_cache_path, "w") as f: json.dump(data, f) logging.info("Saved Cync user tokens to disk.") except Exception as e: logging.warning("Failed to save Cync user tokens: %s", e) async def get_lighting_state(self) -> JSONResponse: """ Get the current lighting state. Returns: - **JSONResponse**: Contains the current lighting state. """ try: state = self.redis_client.get(self.lighting_key) if state: return JSONResponse(content=json.loads(str(state))) else: # Default state default_state = { "power": "off", "brightness": 50, "color": {"r": 255, "g": 255, "b": 255}, } return JSONResponse(content=default_state) except Exception as e: logging.error("Error getting lighting state: %s", e) raise HTTPException(status_code=500, detail="Internal server error") async def set_lighting_state(self, request: Request) -> JSONResponse: """ Set the lighting state and apply it to the Cync device. """ logging.info("=== LIGHTING STATE REQUEST RECEIVED ===") try: state = await request.json() logging.info(f"Requested state: {state}") # Validate state (basic validation) if not isinstance(state, dict): raise HTTPException( status_code=400, detail="State must be a JSON object" ) # Store in Redis self.redis_client.set(self.lighting_key, json.dumps(state)) await self.ensure_cync_connection() # Validate and extract state values power = state.get("power", "off") if power not in ["on", "off"]: raise HTTPException( status_code=400, detail=f"Invalid power state: {power}" ) brightness = state.get("brightness", 50) if not isinstance(brightness, (int, float)) or not (0 <= brightness <= 100): raise HTTPException( status_code=400, detail=f"Invalid brightness: {brightness}" ) color = state.get("color") if ( color and isinstance(color, dict) and all(k in color for k in ["r", "g", "b"]) ): rgb = (color["r"], color["g"], color["b"]) elif all(k in state for k in ["red", "green", "blue"]): rgb = (state["red"], state["green"], state["blue"]) for val, name in zip(rgb, ["red", "green", "blue"]): if not isinstance(val, int) or not (0 <= val <= 255): raise HTTPException( status_code=400, detail=f"Invalid {name} color value: {val}" ) else: rgb = None # Apply to Cync device with retry on connection issues max_retries = 2 for attempt in range(max_retries): try: # Use persistent Cync API object if not self.cync_api: logging.warning("Cync API not initialized, attempting to reconnect...") await self.ensure_cync_connection() if not self.cync_api: raise Exception("Cync API still not initialized after reconnection.") logging.info("Getting devices from Cync API...") devices = self.cync_api.get_devices() logging.info(f"Devices returned from Cync API: {[getattr(d, 'name', None) for d in devices]}") light = next( ( d for d in devices if hasattr(d, "name") and d.name == self.cync_device_name ), None, ) if not light: logging.error(f"Device '{self.cync_device_name}' not found in {[getattr(d, 'name', None) for d in devices]}") raise Exception(f"Device '{self.cync_device_name}' not found") logging.info(f"Selected device: {light}") # Set power if power == "on": result = await light.turn_on() logging.info(f"turn_on result: {result}") else: result = await light.turn_off() logging.info(f"turn_off result: {result}") # Set brightness if "brightness" in state: result = await light.set_brightness(brightness) logging.info(f"set_brightness result: {result}") # Set color if rgb: result = await light.set_rgb(rgb) logging.info(f"set_rgb result: {result}") break # Success, exit retry loop except (aiohttp.ClientConnectionError, aiohttp.ClientOSError) as e: if attempt < max_retries - 1: logging.warning( "Connection closed (attempt %d/%d): %s. Retrying with reconnection.", attempt + 1, max_retries, e, ) await self.ensure_cync_connection() else: logging.error( "Connection failed after %d attempts: %s", max_retries, e, ) raise except Exception as e: logging.error("Unexpected error during device operation: %s", e) logging.error("Error type: %s", type(e).__name__) # Try to reconnect on any error for next attempt if attempt < max_retries - 1: logging.warning("Attempting reconnection due to error...") await self.ensure_cync_connection() else: raise logging.info( "Successfully applied state to device '%s': %s", self.cync_device_name, state, ) return JSONResponse( content={ "message": "Lighting state updated and applied", "state": state, } ) except HTTPException: raise except Exception as e: logging.error("Error setting lighting state: %s", e) raise HTTPException(status_code=500, detail="Internal server error")