import logging import json import os import time import aiohttp from fastapi import FastAPI, Depends, HTTPException, Request from fastapi_throttle import RateLimiter from fastapi.responses import JSONResponse import redis from lyric_search.sources import private from auth.deps import get_current_user from dotenv import load_dotenv from pycync.user import User # type: ignore from pycync.cync import Cync as Cync # type: ignore from pycync import Auth # type: ignore from pycync.exceptions import TwoFactorRequiredError, AuthFailedError # type: ignore class Lighting(FastAPI): async def ensure_cync_connection(self): """Ensure aiohttp session and Cync API are alive, re-create if needed.""" # Check required environment variables missing_vars = [] if not self.cync_email: missing_vars.append("CYNC_EMAIL") if not self.cync_password: missing_vars.append("CYNC_PASSWORD") if not self.cync_device_name: missing_vars.append("CYNC_DEVICE_NAME") if missing_vars: raise Exception(f"Missing required environment variables: {', '.join(missing_vars)}") # Cast to str after check to silence linter cync_email: str = self.cync_email # type: ignore cync_password: str = self.cync_password # type: ignore # Check if session is closed or missing if not self.session or getattr(self.session, 'closed', False): self.session = aiohttp.ClientSession() cached_user = self._load_cached_user() if cached_user: self.auth = Auth(session=self.session, user=cached_user, username=cync_email, password=cync_password) else: self.auth = Auth(session=self.session, username=cync_email, password=cync_password) # Try to refresh token self.cync_user = None if self.auth.user and hasattr(self.auth.user, 'expires_at') and self.auth.user.expires_at > time.time(): try: await self.auth.async_refresh_user_token() self.cync_user = self.auth.user self._save_cached_user(self.cync_user) except AuthFailedError: pass # If no valid token, login if not self.cync_user: try: self.cync_user = await self.auth.login() self._save_cached_user(self.cync_user) except TwoFactorRequiredError: logging.error("Cync 2FA required. Set CYNC_2FA_CODE in env if needed.") raise Exception("Cync 2FA required.") except AuthFailedError as e: logging.error(f"Failed to authenticate with Cync API: {e}") raise Exception("Cync authentication failed.") self.cync_api = await Cync.create(self.auth) # Also check if cync_api is None (shouldn't happen, but just in case) if not self.cync_api: if not self.auth: logging.critical("self.auth: %s", self.auth) return self.cync_api = await Cync.create(self.auth) """ Lighting Endpoints """ def __init__(self, app: FastAPI, util, constants) -> None: """Initialize Lighting endpoints and persistent Cync connection.""" load_dotenv() self.app: FastAPI = app self.util = util self.constants = constants self.redis_client = redis.Redis(password=private.REDIS_PW, decode_responses=True) self.lighting_key = "lighting:state" # Cync config self.cync_email = os.getenv("CYNC_EMAIL") self.cync_password = os.getenv("CYNC_PASSWORD") self.cync_device_name = os.getenv("CYNC_DEVICE_NAME") self.token_cache_path = "cync_token.json" self.session = None self.auth = None self.cync_user = None self.cync_api = None # Set up Cync connection at startup using FastAPI event @app.on_event("startup") async def startup_event(): # Check required environment variables missing_vars = [] if not self.cync_email: missing_vars.append("CYNC_EMAIL") if not self.cync_password: missing_vars.append("CYNC_PASSWORD") if not self.cync_device_name: missing_vars.append("CYNC_DEVICE_NAME") if missing_vars: raise Exception(f"Missing required environment variables: {', '.join(missing_vars)}") self.session = aiohttp.ClientSession() cached_user = self._load_cached_user() if cached_user: self.auth = Auth(session=self.session, user=cached_user, username=self.cync_email or "", password=self.cync_password or "") else: self.auth = Auth(session=self.session, username=self.cync_email or "", password=self.cync_password or "") # Try to refresh token if self.auth.user and hasattr(self.auth.user, 'expires_at') and self.auth.user.expires_at > time.time(): try: await self.auth.async_refresh_user_token() self.cync_user = self.auth.user self._save_cached_user(self.cync_user) except AuthFailedError: pass # If no valid token, login if not self.cync_user: try: self.cync_user = await self.auth.login() self._save_cached_user(self.cync_user) except TwoFactorRequiredError: logging.error("Cync 2FA required. Set CYNC_2FA_CODE in env if needed.") raise Exception("Cync 2FA required.") except AuthFailedError as e: logging.error(f"Failed to authenticate with Cync API: {e}") raise Exception("Cync authentication failed.") # Create persistent Cync API object self.cync_api = await Cync.create(self.auth) self.endpoints: dict = { "lighting/state": self.get_lighting_state, } for endpoint, handler in self.endpoints.items(): app.add_api_route( f"/{endpoint}", handler, methods=["GET"], include_in_schema=True, dependencies=[Depends(RateLimiter(times=10, seconds=2)), Depends(get_current_user)], ) app.add_api_route( "/lighting/state", self.set_lighting_state, methods=["POST"], include_in_schema=True, dependencies=[Depends(RateLimiter(times=10, seconds=2)), Depends(get_current_user)], ) def _load_cached_user(self): try: if os.path.exists(self.token_cache_path): with open(self.token_cache_path, "r") as f: data = json.load(f) return User( access_token=data["access_token"], refresh_token=data["refresh_token"], authorize=data["authorize"], user_id=data["user_id"], expires_at=data["expires_at"] ) except Exception as e: logging.warning(f"Failed to load cached Cync user: {e}") return None def _save_cached_user(self, user): try: data = { "access_token": user.access_token, "refresh_token": user.refresh_token, "authorize": user.authorize, "user_id": user.user_id, "expires_at": user.expires_at } with open(self.token_cache_path, "w") as f: json.dump(data, f) logging.info("Saved Cync user tokens to disk.") except Exception as e: logging.warning(f"Failed to save Cync user tokens: {e}") async def get_lighting_state(self) -> JSONResponse: """ Get the current lighting state. Returns: - **JSONResponse**: Contains the current lighting state. """ try: state = self.redis_client.get(self.lighting_key) if state: return JSONResponse(content=json.loads(str(state))) else: # Default state default_state = { "power": "off", "brightness": 50, "color": {"r": 255, "g": 255, "b": 255} } return JSONResponse(content=default_state) except Exception as e: logging.error(f"Error getting lighting state: {e}") raise HTTPException(status_code=500, detail="Internal server error") async def set_lighting_state(self, request: Request) -> JSONResponse: """ Set the lighting state and apply it to the Cync device. """ try: state = await request.json() # Validate state (basic validation) if not isinstance(state, dict): raise HTTPException(status_code=400, detail="State must be a JSON object") # Store in Redis self.redis_client.set(self.lighting_key, json.dumps(state)) await self.ensure_cync_connection() # Apply to Cync device power = state.get("power", "off") if power not in ["on", "off"]: raise HTTPException(status_code=400, detail=f"Invalid power state: {power}") brightness = state.get("brightness", 50) if not isinstance(brightness, (int, float)) or not (0 <= brightness <= 100): raise HTTPException(status_code=400, detail=f"Invalid brightness: {brightness}") color = state.get("color") if color and isinstance(color, dict) and all(k in color for k in ["r", "g", "b"]): rgb = (color["r"], color["g"], color["b"]) elif all(k in state for k in ["red", "green", "blue"]): rgb = (state["red"], state["green"], state["blue"]) for val, name in zip(rgb, ["red", "green", "blue"]): if not isinstance(val, int) or not (0 <= val <= 255): raise HTTPException(status_code=400, detail=f"Invalid {name} color value: {val}") else: rgb = None # Use persistent Cync API object if not self.cync_api: raise HTTPException(status_code=500, detail="Cync API not initialized.") devices = self.cync_api.get_devices() if not devices or not isinstance(devices, (list, tuple)): raise HTTPException(status_code=500, detail="No devices returned from Cync API.") light = next((d for d in devices if hasattr(d, 'name') and d.name == self.cync_device_name), None) if not light: raise HTTPException(status_code=404, detail=f"Device '{self.cync_device_name}' not found") # Set power if power == "on": await light.turn_on() else: await light.turn_off() # Set brightness if "brightness" in state: await light.set_brightness(brightness) # Set color if rgb: await light.set_rgb(rgb) logging.info(f"Successfully applied state to device '{self.cync_device_name}': {state}") return JSONResponse(content={"message": "Lighting state updated and applied", "state": state}) except HTTPException: raise except Exception as e: logging.error(f"Error setting lighting state: {e}") raise HTTPException(status_code=500, detail="Internal server error")