Files
api/endpoints/lighting.py

444 lines
18 KiB
Python
Raw Permalink Normal View History

import logging
import json
import os
import time
import aiohttp
import asyncio
from fastapi import FastAPI, Depends, HTTPException, Request
from fastapi_throttle import RateLimiter
from fastapi.responses import JSONResponse
import redis
from lyric_search.sources import private
from auth.deps import get_current_user
from dotenv import load_dotenv
2025-10-02 10:45:30 -04:00
from pycync.user import User # type: ignore
from pycync.cync import Cync as Cync # type: ignore
from pycync import Auth # type: ignore
from pycync.exceptions import TwoFactorRequiredError, AuthFailedError # type: ignore
import inspect
import getpass
class Lighting(FastAPI):
async def ensure_cync_connection(self):
"""Ensure aiohttp session and Cync API are alive, re-create if needed."""
# Check required environment variables
missing_vars = []
if not self.cync_email:
missing_vars.append("CYNC_EMAIL")
if not self.cync_password:
missing_vars.append("CYNC_PASSWORD")
if not self.cync_device_name:
missing_vars.append("CYNC_DEVICE_NAME")
if missing_vars:
2025-10-02 10:45:30 -04:00
raise Exception(
f"Missing required environment variables: {', '.join(missing_vars)}"
)
# Cast to str after check to silence linter
cync_email: str = self.cync_email # type: ignore
cync_password: str = self.cync_password # type: ignore
# Check if session is closed or missing
2025-10-02 10:45:30 -04:00
if not self.session or getattr(self.session, "closed", False):
self.session = aiohttp.ClientSession()
# Load cached token and check validity
self.cync_user = None
cached_user = self._load_cached_user()
token_status = None
if cached_user:
if hasattr(cached_user, "expires_at"):
if cached_user.expires_at > time.time():
token_status = "valid"
else:
token_status = "expired"
else:
token_status = "missing expires_at"
else:
token_status = "no cached user"
logging.info(f"Cync token status: {token_status}")
if token_status == "valid" and cached_user is not None:
# Use cached token
2025-10-02 10:45:30 -04:00
self.auth = Auth(
session=self.session,
user=cached_user,
username=cync_email,
password=cync_password,
)
self.cync_user = cached_user
logging.info("Reusing valid cached token, no 2FA required.")
else:
# Need fresh login
2025-10-02 10:45:30 -04:00
self.auth = Auth(
session=self.session,
username=cync_email,
password=cync_password,
2025-10-02 10:45:30 -04:00
)
try:
self.cync_user = await self.auth.login()
self._save_cached_user(self.cync_user)
except TwoFactorRequiredError:
twofa_code = os.getenv("CYNC_2FA_CODE")
if not twofa_code:
print("Cync 2FA required. Please enter your code:")
twofa_code = getpass.getpass("2FA Code: ")
if twofa_code:
logging.info("Retrying Cync login with 2FA code.")
try:
self.cync_user = await self.auth.login(two_factor_code=twofa_code)
self._save_cached_user(self.cync_user)
logging.info("Logged in with 2FA successfully.")
except Exception as e:
logging.error("Cync 2FA login failed: %s", e)
raise Exception("Cync 2FA code invalid or not accepted.")
else:
logging.error("Cync 2FA required but no code provided.")
raise Exception("Cync 2FA required.")
except AuthFailedError as e:
2025-10-02 10:45:30 -04:00
logging.error("Failed to authenticate with Cync API: %s", e)
raise Exception("Cync authentication failed.")
self.cync_api = await Cync.create(self.auth)
# Also check if cync_api is None (shouldn't happen, but just in case)
if not self.cync_api:
if not self.auth:
logging.critical("self.auth: %s", self.auth)
return
self.cync_api = await Cync.create(self.auth)
2025-10-02 10:45:30 -04:00
"""
Lighting Endpoints
"""
def __init__(self, app: FastAPI, util, constants) -> None:
"""Initialize Lighting endpoints and persistent Cync connection."""
load_dotenv()
self.app: FastAPI = app
self.util = util
self.constants = constants
2025-10-02 10:45:30 -04:00
self.redis_client = redis.Redis(
password=private.REDIS_PW, decode_responses=True
)
self.lighting_key = "lighting:state"
# Cync config
self.cync_email = os.getenv("CYNC_EMAIL")
self.cync_password = os.getenv("CYNC_PASSWORD")
self.cync_device_name = os.getenv("CYNC_DEVICE_NAME")
self.token_cache_path = "cync_token.json"
self.session = None
self.auth = None
self.cync_user = None
self.cync_api = None
# Set up Cync connection at startup using FastAPI event
@app.on_event("startup")
async def startup_event():
# Check required environment variables
missing_vars = []
if not self.cync_email:
missing_vars.append("CYNC_EMAIL")
if not self.cync_password:
missing_vars.append("CYNC_PASSWORD")
if not self.cync_device_name:
missing_vars.append("CYNC_DEVICE_NAME")
if missing_vars:
2025-10-02 10:45:30 -04:00
raise Exception(
f"Missing required environment variables: {', '.join(missing_vars)}"
)
# Use ensure_cync_connection which has proper token caching
await self.ensure_cync_connection()
# Create persistent Cync API object
if self.auth:
self.cync_api = await Cync.create(self.auth)
# Schedule periodic token validation
asyncio.create_task(self._schedule_token_validation())
# Register endpoints
self.endpoints: dict = {
"lighting/state": self.get_lighting_state,
}
for endpoint, handler in self.endpoints.items():
self.app.add_api_route(
f"/{endpoint}",
handler,
methods=["GET"],
include_in_schema=True,
2025-10-02 10:45:30 -04:00
dependencies=[
Depends(RateLimiter(times=25, seconds=2)),
2025-10-02 10:45:30 -04:00
Depends(get_current_user),
],
)
self.app.add_api_route(
"/lighting/state",
self.set_lighting_state,
methods=["POST"],
include_in_schema=True,
2025-10-02 10:45:30 -04:00
dependencies=[
Depends(RateLimiter(times=25, seconds=2)),
2025-10-02 10:45:30 -04:00
Depends(get_current_user),
],
)
async def _refresh_or_login(self):
if not self.auth:
logging.error("Auth object is not initialized.")
raise Exception("Cync authentication not initialized.")
try:
user = getattr(self.auth, 'user', None)
if user and hasattr(user, "expires_at") and user.expires_at > time.time():
refresh = getattr(self.auth, 'async_refresh_user_token', None)
if callable(refresh):
try:
result = refresh()
if inspect.isawaitable(result):
await result
else:
pass # do nothing if not awaitable
except AuthFailedError as e:
logging.warning("Token refresh failed: %s", e)
login = getattr(self.auth, 'login', None)
if callable(login):
try:
result = login()
if inspect.isawaitable(result):
self.cync_user = await result
else:
self.cync_user = result
self._save_cached_user(self.cync_user)
logging.info("Logged in successfully.")
except TwoFactorRequiredError:
twofa_code = os.getenv("CYNC_2FA_CODE")
if not twofa_code:
# Prompt interactively if not set
print("Cync 2FA required. Please enter your code:")
twofa_code = getpass.getpass("2FA Code: ")
if twofa_code:
logging.info("Retrying Cync login with 2FA code.")
try:
result = login(two_factor_code=twofa_code)
if inspect.isawaitable(result):
self.cync_user = await result
else:
self.cync_user = result
self._save_cached_user(self.cync_user)
logging.info("Logged in with 2FA successfully.")
except Exception as e:
logging.error("Cync 2FA login failed: %s", e)
raise Exception("Cync 2FA code invalid or not accepted.")
else:
logging.error("Cync 2FA required but no code provided.")
raise Exception("Cync 2FA required.")
else:
raise Exception("Auth object missing login method.")
except AuthFailedError as e:
logging.error("Failed to authenticate with Cync API: %s", e)
raise Exception("Cync authentication failed.")
except Exception as e:
logging.error("Unexpected error during authentication: %s", e)
raise
async def _schedule_token_validation(self):
while True:
try:
await asyncio.sleep(300)
user = getattr(self.auth, 'user', None)
if user and hasattr(user, "expires_at") and user.expires_at - time.time() < 600:
logging.info("Token is about to expire. Refreshing...")
await self._refresh_or_login()
except Exception as e:
logging.error("Error during periodic token validation: %s", e)
def _load_cached_user(self):
try:
if os.path.exists(self.token_cache_path):
with open(self.token_cache_path, "r") as f:
data = json.load(f)
return User(
access_token=data["access_token"],
refresh_token=data["refresh_token"],
authorize=data["authorize"],
user_id=data["user_id"],
2025-10-02 10:45:30 -04:00
expires_at=data["expires_at"],
)
except Exception as e:
2025-10-02 10:45:30 -04:00
logging.warning("Failed to load cached Cync user: %s", e)
return None
def _save_cached_user(self, user):
try:
data = {
"access_token": user.access_token,
"refresh_token": user.refresh_token,
"authorize": user.authorize,
"user_id": user.user_id,
2025-10-02 10:45:30 -04:00
"expires_at": user.expires_at,
}
with open(self.token_cache_path, "w") as f:
json.dump(data, f)
logging.info("Saved Cync user tokens to disk.")
except Exception as e:
2025-10-02 10:45:30 -04:00
logging.warning("Failed to save Cync user tokens: %s", e)
async def get_lighting_state(self) -> JSONResponse:
"""
Get the current lighting state.
Returns:
- **JSONResponse**: Contains the current lighting state.
"""
try:
state = self.redis_client.get(self.lighting_key)
if state:
return JSONResponse(content=json.loads(str(state)))
else:
# Default state
default_state = {
"power": "off",
"brightness": 50,
2025-10-02 10:45:30 -04:00
"color": {"r": 255, "g": 255, "b": 255},
}
return JSONResponse(content=default_state)
except Exception as e:
2025-10-02 10:45:30 -04:00
logging.error("Error getting lighting state: %s", e)
raise HTTPException(status_code=500, detail="Internal server error")
async def set_lighting_state(self, request: Request) -> JSONResponse:
"""
Set the lighting state and apply it to the Cync device.
"""
logging.info("=== LIGHTING STATE REQUEST RECEIVED ===")
try:
state = await request.json()
logging.info(f"Requested state: {state}")
# Validate state (basic validation)
if not isinstance(state, dict):
2025-10-02 10:45:30 -04:00
raise HTTPException(
status_code=400, detail="State must be a JSON object"
)
# Store in Redis
self.redis_client.set(self.lighting_key, json.dumps(state))
await self.ensure_cync_connection()
# Validate and extract state values
power = state.get("power", "off")
if power not in ["on", "off"]:
2025-10-02 10:45:30 -04:00
raise HTTPException(
status_code=400, detail=f"Invalid power state: {power}"
)
brightness = state.get("brightness", 50)
if not isinstance(brightness, (int, float)) or not (0 <= brightness <= 100):
2025-10-02 10:45:30 -04:00
raise HTTPException(
status_code=400, detail=f"Invalid brightness: {brightness}"
)
color = state.get("color")
2025-10-02 10:45:30 -04:00
if (
color
and isinstance(color, dict)
and all(k in color for k in ["r", "g", "b"])
):
rgb = (color["r"], color["g"], color["b"])
elif all(k in state for k in ["red", "green", "blue"]):
rgb = (state["red"], state["green"], state["blue"])
for val, name in zip(rgb, ["red", "green", "blue"]):
if not isinstance(val, int) or not (0 <= val <= 255):
2025-10-02 10:45:30 -04:00
raise HTTPException(
status_code=400, detail=f"Invalid {name} color value: {val}"
)
else:
rgb = None
# Apply to Cync device with retry on connection issues
max_retries = 2
for attempt in range(max_retries):
try:
# Use persistent Cync API object
if not self.cync_api:
logging.warning("Cync API not initialized, attempting to reconnect...")
await self.ensure_cync_connection()
if not self.cync_api:
raise Exception("Cync API still not initialized after reconnection.")
logging.info("Getting devices from Cync API...")
devices = self.cync_api.get_devices()
logging.info(f"Devices returned from Cync API: {[getattr(d, 'name', None) for d in devices]}")
light = next(
(
d
for d in devices
if hasattr(d, "name") and d.name == self.cync_device_name
),
None,
)
if not light:
logging.error(f"Device '{self.cync_device_name}' not found in {[getattr(d, 'name', None) for d in devices]}")
raise Exception(f"Device '{self.cync_device_name}' not found")
logging.info(f"Selected device: {light}")
# Set power
if power == "on":
result = await light.turn_on()
logging.info(f"turn_on result: {result}")
else:
result = await light.turn_off()
logging.info(f"turn_off result: {result}")
# Set brightness
if "brightness" in state:
result = await light.set_brightness(brightness)
logging.info(f"set_brightness result: {result}")
# Set color
if rgb:
result = await light.set_rgb(rgb)
logging.info(f"set_rgb result: {result}")
break # Success, exit retry loop
except (aiohttp.ClientConnectionError, aiohttp.ClientOSError) as e:
if attempt < max_retries - 1:
logging.warning(
"Connection closed (attempt %d/%d): %s. Retrying with reconnection.",
attempt + 1,
max_retries,
e,
)
await self.ensure_cync_connection()
else:
logging.error(
"Connection failed after %d attempts: %s",
max_retries,
e,
)
raise
except Exception as e:
logging.error("Unexpected error during device operation: %s", e)
logging.error("Error type: %s", type(e).__name__)
# Try to reconnect on any error for next attempt
if attempt < max_retries - 1:
logging.warning("Attempting reconnection due to error...")
await self.ensure_cync_connection()
else:
raise
2025-10-02 10:45:30 -04:00
logging.info(
"Successfully applied state to device '%s': %s",
self.cync_device_name,
state,
)
return JSONResponse(
content={
"message": "Lighting state updated and applied",
"state": state,
}
)
except HTTPException:
raise
except Exception as e:
2025-10-02 10:45:30 -04:00
logging.error("Error setting lighting state: %s", e)
raise HTTPException(status_code=500, detail="Internal server error")