From fa3c8e8861cdd5ba9ed64a91a1ba28b70df5660b Mon Sep 17 00:00:00 2001 From: codey Date: Thu, 2 Oct 2025 10:40:11 -0400 Subject: [PATCH] Add lighting endpoint with Cync integration and state management --- base.py | 1 + endpoints/lighting.py | 284 ++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 285 insertions(+) create mode 100644 endpoints/lighting.py diff --git a/base.py b/base.py index b514bff..248a071 100644 --- a/base.py +++ b/base.py @@ -101,6 +101,7 @@ routes: dict = { "meme": importlib.import_module("endpoints.meme").Meme(app, util, constants), "trip": importlib.import_module("endpoints.rip").RIP(app, util, constants), "auth": importlib.import_module("endpoints.auth").Auth(app), + "lighting": importlib.import_module("endpoints.lighting").Lighting(app, util, constants), } # Misc endpoint depends on radio endpoint instance diff --git a/endpoints/lighting.py b/endpoints/lighting.py new file mode 100644 index 0000000..73d140d --- /dev/null +++ b/endpoints/lighting.py @@ -0,0 +1,284 @@ +import logging +import json +import os +import time +import aiohttp +from fastapi import FastAPI, Depends, HTTPException, Request +from fastapi_throttle import RateLimiter +from fastapi.responses import JSONResponse +import redis +from lyric_search.sources import private +from auth.deps import get_current_user +from dotenv import load_dotenv +from pycync.user import User # type: ignore +from pycync.cync import Cync as Cync # type: ignore +from pycync import Auth # type: ignore +from pycync.exceptions import TwoFactorRequiredError, AuthFailedError # type: ignore + + +class Lighting(FastAPI): + async def ensure_cync_connection(self): + """Ensure aiohttp session and Cync API are alive, re-create if needed.""" + # Check required environment variables + missing_vars = [] + if not self.cync_email: + missing_vars.append("CYNC_EMAIL") + if not self.cync_password: + missing_vars.append("CYNC_PASSWORD") + if not self.cync_device_name: + missing_vars.append("CYNC_DEVICE_NAME") + if missing_vars: + raise Exception(f"Missing required environment variables: {', '.join(missing_vars)}") + # Cast to str after check to silence linter + cync_email: str = self.cync_email # type: ignore + cync_password: str = self.cync_password # type: ignore + + # Check if session is closed or missing + if not self.session or getattr(self.session, 'closed', False): + self.session = aiohttp.ClientSession() + cached_user = self._load_cached_user() + if cached_user: + self.auth = Auth(session=self.session, + user=cached_user, + username=cync_email, + password=cync_password) + else: + self.auth = Auth(session=self.session, username=cync_email, + password=cync_password) + # Try to refresh token + self.cync_user = None + if self.auth.user and hasattr(self.auth.user, 'expires_at') and self.auth.user.expires_at > time.time(): + try: + await self.auth.async_refresh_user_token() + self.cync_user = self.auth.user + self._save_cached_user(self.cync_user) + except AuthFailedError: + pass + # If no valid token, login + if not self.cync_user: + try: + self.cync_user = await self.auth.login() + self._save_cached_user(self.cync_user) + except TwoFactorRequiredError: + logging.error("Cync 2FA required. Set CYNC_2FA_CODE in env if needed.") + raise Exception("Cync 2FA required.") + except AuthFailedError as e: + logging.error(f"Failed to authenticate with Cync API: {e}") + raise Exception("Cync authentication failed.") + self.cync_api = await Cync.create(self.auth) + # Also check if cync_api is None (shouldn't happen, but just in case) + if not self.cync_api: + if not self.auth: + logging.critical("self.auth: %s", self.auth) + return + self.cync_api = await Cync.create(self.auth) + """ + Lighting Endpoints + """ + + def __init__(self, app: FastAPI, util, constants) -> None: + """Initialize Lighting endpoints and persistent Cync connection.""" + load_dotenv() + self.app: FastAPI = app + self.util = util + self.constants = constants + self.redis_client = redis.Redis(password=private.REDIS_PW, decode_responses=True) + self.lighting_key = "lighting:state" + + # Cync config + self.cync_email = os.getenv("CYNC_EMAIL") + self.cync_password = os.getenv("CYNC_PASSWORD") + self.cync_device_name = os.getenv("CYNC_DEVICE_NAME") + self.token_cache_path = "cync_token.json" + self.session = None + self.auth = None + self.cync_user = None + self.cync_api = None + + # Set up Cync connection at startup using FastAPI event + @app.on_event("startup") + async def startup_event(): + # Check required environment variables + missing_vars = [] + if not self.cync_email: + missing_vars.append("CYNC_EMAIL") + if not self.cync_password: + missing_vars.append("CYNC_PASSWORD") + if not self.cync_device_name: + missing_vars.append("CYNC_DEVICE_NAME") + if missing_vars: + raise Exception(f"Missing required environment variables: {', '.join(missing_vars)}") + + self.session = aiohttp.ClientSession() + cached_user = self._load_cached_user() + if cached_user: + self.auth = Auth(session=self.session, user=cached_user, + username=self.cync_email or "", + password=self.cync_password or "") + else: + self.auth = Auth(session=self.session, + username=self.cync_email or "", + password=self.cync_password or "") + # Try to refresh token + if self.auth.user and hasattr(self.auth.user, 'expires_at') and self.auth.user.expires_at > time.time(): + try: + await self.auth.async_refresh_user_token() + self.cync_user = self.auth.user + self._save_cached_user(self.cync_user) + except AuthFailedError: + pass + # If no valid token, login + if not self.cync_user: + try: + self.cync_user = await self.auth.login() + self._save_cached_user(self.cync_user) + except TwoFactorRequiredError: + logging.error("Cync 2FA required. Set CYNC_2FA_CODE in env if needed.") + raise Exception("Cync 2FA required.") + except AuthFailedError as e: + logging.error(f"Failed to authenticate with Cync API: {e}") + raise Exception("Cync authentication failed.") + # Create persistent Cync API object + self.cync_api = await Cync.create(self.auth) + + self.endpoints: dict = { + "lighting/state": self.get_lighting_state, + } + + for endpoint, handler in self.endpoints.items(): + app.add_api_route( + f"/{endpoint}", + handler, + methods=["GET"], + include_in_schema=True, + dependencies=[Depends(RateLimiter(times=10, seconds=2)), Depends(get_current_user)], + ) + + app.add_api_route( + "/lighting/state", + self.set_lighting_state, + methods=["POST"], + include_in_schema=True, + dependencies=[Depends(RateLimiter(times=10, seconds=2)), Depends(get_current_user)], + ) + + def _load_cached_user(self): + try: + if os.path.exists(self.token_cache_path): + with open(self.token_cache_path, "r") as f: + data = json.load(f) + return User( + access_token=data["access_token"], + refresh_token=data["refresh_token"], + authorize=data["authorize"], + user_id=data["user_id"], + expires_at=data["expires_at"] + ) + except Exception as e: + logging.warning(f"Failed to load cached Cync user: {e}") + return None + + def _save_cached_user(self, user): + try: + data = { + "access_token": user.access_token, + "refresh_token": user.refresh_token, + "authorize": user.authorize, + "user_id": user.user_id, + "expires_at": user.expires_at + } + with open(self.token_cache_path, "w") as f: + json.dump(data, f) + logging.info("Saved Cync user tokens to disk.") + except Exception as e: + logging.warning(f"Failed to save Cync user tokens: {e}") + + async def get_lighting_state(self) -> JSONResponse: + """ + Get the current lighting state. + + Returns: + - **JSONResponse**: Contains the current lighting state. + """ + try: + state = self.redis_client.get(self.lighting_key) + if state: + return JSONResponse(content=json.loads(str(state))) + else: + # Default state + default_state = { + "power": "off", + "brightness": 50, + "color": {"r": 255, "g": 255, "b": 255} + } + return JSONResponse(content=default_state) + except Exception as e: + logging.error(f"Error getting lighting state: {e}") + raise HTTPException(status_code=500, detail="Internal server error") + + async def set_lighting_state(self, request: Request) -> JSONResponse: + """ + Set the lighting state and apply it to the Cync device. + """ + try: + state = await request.json() + # Validate state (basic validation) + if not isinstance(state, dict): + raise HTTPException(status_code=400, detail="State must be a JSON object") + + # Store in Redis + self.redis_client.set(self.lighting_key, json.dumps(state)) + + await self.ensure_cync_connection() + + # Apply to Cync device + power = state.get("power", "off") + if power not in ["on", "off"]: + raise HTTPException(status_code=400, detail=f"Invalid power state: {power}") + + brightness = state.get("brightness", 50) + if not isinstance(brightness, (int, float)) or not (0 <= brightness <= 100): + raise HTTPException(status_code=400, detail=f"Invalid brightness: {brightness}") + + color = state.get("color") + if color and isinstance(color, dict) and all(k in color for k in ["r", "g", "b"]): + rgb = (color["r"], color["g"], color["b"]) + elif all(k in state for k in ["red", "green", "blue"]): + rgb = (state["red"], state["green"], state["blue"]) + for val, name in zip(rgb, ["red", "green", "blue"]): + if not isinstance(val, int) or not (0 <= val <= 255): + raise HTTPException(status_code=400, detail=f"Invalid {name} color value: {val}") + else: + rgb = None + + # Use persistent Cync API object + if not self.cync_api: + raise HTTPException(status_code=500, detail="Cync API not initialized.") + devices = self.cync_api.get_devices() + if not devices or not isinstance(devices, (list, tuple)): + raise HTTPException(status_code=500, detail="No devices returned from Cync API.") + light = next((d for d in devices if hasattr(d, 'name') and d.name == self.cync_device_name), None) + if not light: + raise HTTPException(status_code=404, detail=f"Device '{self.cync_device_name}' not found") + + # Set power + if power == "on": + await light.turn_on() + else: + await light.turn_off() + + # Set brightness + if "brightness" in state: + await light.set_brightness(brightness) + + # Set color + if rgb: + await light.set_rgb(rgb) + + logging.info(f"Successfully applied state to device '{self.cync_device_name}': {state}") + return JSONResponse(content={"message": "Lighting state updated and applied", "state": state}) + except HTTPException: + raise + except Exception as e: + logging.error(f"Error setting lighting state: {e}") + raise HTTPException(status_code=500, detail="Internal server error") \ No newline at end of file