Files
api/endpoints/lighting.py
2025-11-22 21:43:48 -05:00

742 lines
31 KiB
Python

import logging
import json
import os
import time
import aiohttp
import asyncio
import traceback
from datetime import datetime
from fastapi import FastAPI, Depends, HTTPException, Request
from fastapi_throttle import RateLimiter
from fastapi.responses import JSONResponse
import redis
from lyric_search.sources import private
from auth.deps import get_current_user
from dotenv import load_dotenv
from pycync.user import User # type: ignore
from pycync.cync import Cync as Cync # type: ignore
from pycync import Auth # type: ignore
from pycync.exceptions import TwoFactorRequiredError, AuthFailedError # type: ignore
import inspect
import getpass
from typing import Optional
# Configure logging to write to a file for specific events
logging.basicConfig(
filename="cync_auth_events.log",
level=logging.INFO,
format="%(asctime)s - %(levelname)s - %(message)s",
)
def _mask_token(token: Optional[str]) -> str:
"""Mask sensitive token data for logging, showing only first/last 4 chars."""
if not token or len(token) < 8:
return "<invalid_token>"
return f"{token[:4]}...{token[-4:]}"
def _log_token_state(user, context: str):
"""Log masked token state for debugging."""
if not user:
logging.info(f"{context} - No user object available")
return
try:
logging.info(
f"{context} - Token state: access=%s refresh=%s expires_at=%s",
_mask_token(getattr(user, "access_token", None)),
_mask_token(getattr(user, "refresh_token", None)),
getattr(user, "expires_at", None),
)
except Exception as e:
logging.error(f"Error logging token state: {e}")
class Lighting(FastAPI):
async def _close_session_safely(self):
"""Safely close the current session if it exists."""
if self.session and not getattr(self.session, "closed", True):
try:
await self.session.close()
# Wait a bit for the underlying connections to close
await asyncio.sleep(0.1)
except Exception as e:
logging.warning(f"Error closing session: {e}")
self.session = None
self.auth = None
self.cync_api = None
async def _test_connection_health(self) -> bool:
"""Test if the current connection is healthy by making a simple API call."""
if (
not self.cync_api
or not self.session
or getattr(self.session, "closed", True)
):
return False
try:
# Make a simple API call to test connectivity
devices = self.cync_api.get_devices()
# Just check if we get a response without errors
return devices is not None
except Exception as e:
logging.warning(f"Connection health check failed: {e}")
return False
async def ensure_cync_connection(self, force_reconnect: bool = False):
"""Ensure aiohttp session and Cync API are alive, re-create if needed."""
# Check required environment variables
missing_vars = []
if not self.cync_email:
missing_vars.append("CYNC_EMAIL")
if not self.cync_password:
missing_vars.append("CYNC_PASSWORD")
if not self.cync_device_name:
missing_vars.append("CYNC_DEVICE_NAME")
if missing_vars:
raise Exception(
f"Missing required environment variables: {', '.join(missing_vars)}"
)
# Cast to str after check to silence linter
cync_email: str = self.cync_email # type: ignore
cync_password: str = self.cync_password # type: ignore
# If force_reconnect is True or connection is unhealthy, rebuild everything
if force_reconnect or not await self._test_connection_health():
logging.info(
"Connection unhealthy or force reconnect requested. Rebuilding connection..."
)
# Clean up existing connection
await self._close_session_safely()
# Create new session with timeout configuration
timeout = aiohttp.ClientTimeout(total=30, connect=10)
connector = aiohttp.TCPConnector(
limit=100,
limit_per_host=30,
ttl_dns_cache=300,
use_dns_cache=True,
keepalive_timeout=60,
enable_cleanup_closed=True,
)
self.session = aiohttp.ClientSession(timeout=timeout, connector=connector)
# Load cached token and check validity
self.cync_user = None
cached_user = self._load_cached_user()
token_status = None
if cached_user and hasattr(cached_user, "expires_at"):
# Add buffer time - consider token expired if less than 5 minutes remaining
buffer_time = 300 # 5 minutes
if cached_user.expires_at > (time.time() + buffer_time):
token_status = "valid"
else:
token_status = "expired"
else:
token_status = "no cached user or missing expires_at"
logging.info(f"Cync token status: {token_status}")
if token_status == "valid" and cached_user is not None:
# Use cached token
self.auth = Auth(
session=self.session,
user=cached_user,
username=cync_email,
password=cync_password,
)
self.cync_user = cached_user
logging.info("Reusing valid cached token, no 2FA required.")
else:
# Need fresh login - clear any cached user that's expired
if token_status == "expired":
try:
os.remove(self.token_cache_path)
logging.info("Removed expired token cache")
except (OSError, FileNotFoundError):
pass
logging.info("Initializing new Auth instance...")
self.auth = Auth(
session=self.session,
username=cync_email,
password=cync_password,
)
try:
logging.info("Attempting fresh login...")
self.cync_user = await self.auth.login()
_log_token_state(self.cync_user, "After fresh login")
self._save_cached_user(self.cync_user)
logging.info("Fresh login successful")
except TwoFactorRequiredError:
twofa_code = os.getenv("CYNC_2FA_CODE")
if not twofa_code:
print("Cync 2FA required. Please enter your code:")
twofa_code = getpass.getpass("2FA Code: ")
if twofa_code:
logging.info("Retrying Cync login with 2FA code.")
try:
self.cync_user = await self.auth.login(
two_factor_code=twofa_code
)
self._save_cached_user(self.cync_user)
logging.info("Logged in with 2FA successfully.")
except Exception as e:
logging.error("Cync 2FA login failed: %s", e)
logging.info(
"2FA failure details: Code=%s, User=%s",
twofa_code,
self.cync_user,
)
raise Exception("Cync 2FA code invalid or not accepted.")
else:
logging.error("Cync 2FA required but no code provided.")
raise Exception("Cync 2FA required.")
except AuthFailedError as e:
logging.error("Failed to authenticate with Cync API: %s", e)
raise Exception("Cync authentication failed.")
# Create new Cync API instance
try:
logging.info("Creating Cync API instance...")
_log_token_state(self.auth.user, "Before Cync.create")
self.cync_api = await Cync.create(self.auth)
logging.info("Cync API connection established successfully")
except Exception as e:
logging.error("Failed to create Cync API instance")
logging.error("Exception details: %s", str(e))
logging.error("Traceback:\n%s", traceback.format_exc())
# Save diagnostic info
diagnostic_data = {
"timestamp": datetime.now().isoformat(),
"error_type": type(e).__name__,
"error_message": str(e),
"auth_state": {
"has_auth": bool(self.auth),
"has_user": bool(getattr(self.auth, "user", None)),
"user_state": {
"access_token": _mask_token(
getattr(self.auth.user, "access_token", None)
)
if self.auth and self.auth.user
else None,
"refresh_token": _mask_token(
getattr(self.auth.user, "refresh_token", None)
)
if self.auth and self.auth.user
else None,
"expires_at": getattr(self.auth.user, "expires_at", None)
if self.auth and self.auth.user
else None,
}
if self.auth and self.auth.user
else None,
},
}
diagnostic_file = f"cync_api_failure-{int(time.time())}.json"
try:
with open(diagnostic_file, "w") as f:
json.dump(diagnostic_data, f, indent=2)
logging.info(
f"Saved API creation diagnostic data to {diagnostic_file}"
)
except Exception as save_error:
logging.error(f"Failed to save diagnostic data: {save_error}")
raise
# Final validation
if (
not self.cync_api
or not self.session
or getattr(self.session, "closed", True)
):
logging.error("Connection validation failed after setup")
_log_token_state(
getattr(self.auth, "user", None), "Failed connection validation"
)
raise Exception("Failed to establish proper Cync connection")
"""
Lighting Endpoints
"""
def __init__(self, app: FastAPI, util, constants) -> None:
"""Initialize Lighting endpoints and persistent Cync connection."""
load_dotenv()
self.app: FastAPI = app
self.util = util
self.constants = constants
self.redis_client = redis.Redis(
password=private.REDIS_PW, decode_responses=True
)
self.lighting_key = "lighting:state"
# Cync config
self.cync_email = os.getenv("CYNC_EMAIL")
self.cync_password = os.getenv("CYNC_PASSWORD")
self.cync_device_name = os.getenv("CYNC_DEVICE_NAME")
self.token_cache_path = "cync_token.json"
self.session = None
self.auth = None
self.cync_user = None
self.cync_api = None
self.health_check_task: Optional[asyncio.Task] = None
# Set up Cync connection at startup using FastAPI event
@app.on_event("startup")
async def startup_event():
# Check required environment variables
missing_vars = []
if not self.cync_email:
missing_vars.append("CYNC_EMAIL")
if not self.cync_password:
missing_vars.append("CYNC_PASSWORD")
if not self.cync_device_name:
missing_vars.append("CYNC_DEVICE_NAME")
if missing_vars:
raise Exception(
f"Missing required environment variables: {', '.join(missing_vars)}"
)
# Use ensure_cync_connection which has proper token caching
try:
await self.ensure_cync_connection()
logging.info("Cync lighting system initialized successfully")
except Exception as e:
logging.error(f"Failed to initialize Cync connection at startup: {e}")
# Don't raise - allow server to start, connection will be retried on first request
# Schedule periodic token validation and connection health checks
self.health_check_task = asyncio.create_task(self._schedule_health_checks())
@app.on_event("shutdown")
async def shutdown_event():
# Cancel health check task
if self.health_check_task and not self.health_check_task.done():
self.health_check_task.cancel()
try:
await self.health_check_task
except asyncio.CancelledError:
logging.info("Health check task cancelled successfully")
pass
# Clean up connections
await self._close_session_safely()
logging.info("Cync lighting system shut down cleanly")
# Register endpoints
self.endpoints: dict = {
"lighting/state": self.get_lighting_state,
}
for endpoint, handler in self.endpoints.items():
self.app.add_api_route(
f"/{endpoint}",
handler,
methods=["GET"],
include_in_schema=True,
dependencies=[
Depends(RateLimiter(times=25, seconds=2)),
Depends(get_current_user),
],
)
self.app.add_api_route(
"/lighting/state",
self.set_lighting_state,
methods=["POST"],
include_in_schema=True,
dependencies=[
Depends(RateLimiter(times=25, seconds=2)),
Depends(get_current_user),
],
)
async def _refresh_or_login(self):
if not self.auth:
logging.error("Auth object is not initialized.")
raise Exception("Cync authentication not initialized.")
try:
user = getattr(self.auth, "user", None)
_log_token_state(user, "Before refresh attempt")
if user and hasattr(user, "expires_at") and user.expires_at > time.time():
refresh = getattr(self.auth, "async_refresh_user_token", None)
if callable(refresh):
try:
logging.info("Attempting token refresh...")
result = refresh()
if inspect.isawaitable(result):
await result
logging.info(
"Token refresh completed successfully (awaited)"
)
else:
logging.info("Token refresh completed (non-awaitable)")
except AuthFailedError as e:
logging.error("Token refresh failed with AuthFailedError")
logging.error("Exception details: %s", str(e))
logging.error("Traceback:\n%s", traceback.format_exc())
# Save diagnostic info to file
diagnostic_data = {
"timestamp": datetime.now().isoformat(),
"error_type": "AuthFailedError",
"error_message": str(e),
"user_state": {
"access_token": _mask_token(
getattr(user, "access_token", None)
),
"refresh_token": _mask_token(
getattr(user, "refresh_token", None)
),
"expires_at": getattr(user, "expires_at", None),
},
}
try:
diagnostic_file = (
f"cync_auth_failure-{int(time.time())}.json"
)
with open(diagnostic_file, "w") as f:
json.dump(diagnostic_data, f, indent=2)
logging.info(f"Saved diagnostic data to {diagnostic_file}")
except Exception as save_error:
logging.error(
f"Failed to save diagnostic data: {save_error}"
)
raise
login = getattr(self.auth, "login", None)
if callable(login):
try:
result = login()
if inspect.isawaitable(result):
self.cync_user = await result
else:
self.cync_user = result
self._save_cached_user(self.cync_user)
logging.info("Logged in successfully.")
except TwoFactorRequiredError:
twofa_code = os.getenv("CYNC_2FA_CODE")
if not twofa_code:
# Prompt interactively if not set
print("Cync 2FA required. Please enter your code:")
twofa_code = getpass.getpass("2FA Code: ")
if twofa_code:
logging.info("Retrying Cync login with 2FA code.")
try:
result = login(two_factor_code=twofa_code)
if inspect.isawaitable(result):
self.cync_user = await result
else:
self.cync_user = result
self._save_cached_user(self.cync_user)
logging.info("Logged in with 2FA successfully.")
except Exception as e:
logging.error("Cync 2FA login failed: %s", e)
logging.info(
"2FA failure details: Code=%s, User=%s",
twofa_code,
self.cync_user,
)
raise Exception("Cync 2FA code invalid or not accepted.")
else:
logging.error("Cync 2FA required but no code provided.")
raise Exception("Cync 2FA required.")
else:
raise Exception("Auth object missing login method.")
except AuthFailedError as e:
logging.error("Failed to authenticate with Cync API: %s", e)
raise Exception("Cync authentication failed.")
except Exception as e:
logging.error("Unexpected error during authentication: %s", e)
raise
async def _schedule_health_checks(self):
"""Periodic health checks and token validation."""
while True:
try:
await asyncio.sleep(300) # Check every 5 minutes
# Check token expiration (refresh if less than 10 minutes left)
if self.cync_user and hasattr(self.cync_user, "expires_at"):
expires_at = getattr(self.cync_user, "expires_at", 0)
time_until_expiry = expires_at - time.time()
if time_until_expiry < 600: # Less than 10 minutes
logging.info(
f"Token expires in {int(time_until_expiry / 60)} minutes. Refreshing..."
)
try:
await self._refresh_or_login()
except Exception as e:
logging.error(
f"Token refresh failed during health check: {e}"
)
# Test connection health
if not await self._test_connection_health():
logging.warning(
"Connection health check failed. Will reconnect on next API call."
)
except asyncio.CancelledError:
logging.info("Health check task cancelled")
break
except Exception as e:
logging.error(f"Error during periodic health check: {e}")
# Continue the loop even on errors
def _load_cached_user(self):
try:
if os.path.exists(self.token_cache_path):
with open(self.token_cache_path, "r") as f:
data = json.load(f)
return User(
access_token=data["access_token"],
refresh_token=data["refresh_token"],
authorize=data["authorize"],
user_id=data["user_id"],
expires_at=data["expires_at"],
)
except Exception as e:
logging.warning("Failed to load cached Cync user: %s", e)
return None
def _save_cached_user(self, user):
try:
data = {
"access_token": user.access_token,
"refresh_token": user.refresh_token,
"authorize": user.authorize,
"user_id": user.user_id,
"expires_at": user.expires_at,
}
with open(self.token_cache_path, "w") as f:
json.dump(data, f)
logging.info("Saved Cync user tokens to disk.")
except Exception as e:
logging.warning("Failed to save Cync user tokens: %s", e)
async def get_lighting_state(self) -> JSONResponse:
"""
Get the current lighting state.
Returns:
- **JSONResponse**: Contains the current lighting state.
"""
try:
state = self.redis_client.get(self.lighting_key)
if state:
return JSONResponse(content=json.loads(str(state)))
else:
# Default state
default_state = {
"power": "off",
"brightness": 50,
"color": {"r": 255, "g": 255, "b": 255},
}
return JSONResponse(content=default_state)
except Exception as e:
logging.error("Error getting lighting state: %s", e)
raise HTTPException(status_code=500, detail="Internal server error")
async def set_lighting_state(self, request: Request) -> JSONResponse:
"""
Set the lighting state and apply it to the Cync device.
"""
logging.info("=== LIGHTING STATE REQUEST RECEIVED ===")
try:
state = await request.json()
logging.info(f"Requested state: {state}")
# Validate state (basic validation)
if not isinstance(state, dict):
raise HTTPException(
status_code=400, detail="State must be a JSON object"
)
# Store in Redis
self.redis_client.set(self.lighting_key, json.dumps(state))
await self.ensure_cync_connection()
# Validate and extract state values
power = state.get("power", "off")
if power not in ["on", "off"]:
raise HTTPException(
status_code=400, detail=f"Invalid power state: {power}"
)
brightness = state.get("brightness", 50)
if not isinstance(brightness, (int, float)) or not (0 <= brightness <= 100):
raise HTTPException(
status_code=400, detail=f"Invalid brightness: {brightness}"
)
color = state.get("color")
if (
color
and isinstance(color, dict)
and all(k in color for k in ["r", "g", "b"])
):
rgb = (color["r"], color["g"], color["b"])
elif all(k in state for k in ["red", "green", "blue"]):
rgb = (state["red"], state["green"], state["blue"])
for val, name in zip(rgb, ["red", "green", "blue"]):
if not isinstance(val, int) or not (0 <= val <= 255):
raise HTTPException(
status_code=400, detail=f"Invalid {name} color value: {val}"
)
else:
rgb = None
# Apply to Cync device with robust retry and error handling
max_retries = 3
last_exception: Exception = Exception("No attempts made")
for attempt in range(max_retries):
try:
# Ensure connection before each attempt
force_reconnect = attempt > 0 # Force reconnect on retries
await self.ensure_cync_connection(force_reconnect=force_reconnect)
if not self.cync_api:
raise Exception("Cync API not available after connection setup")
logging.info(
f"Attempt {attempt + 1}/{max_retries}: Getting devices from Cync API..."
)
devices = self.cync_api.get_devices()
if not devices:
raise Exception("No devices returned from Cync API")
logging.info(
f"Devices returned: {[getattr(d, 'name', 'unnamed') for d in devices]}"
)
light = next(
(
d
for d in devices
if hasattr(d, "name") and d.name == self.cync_device_name
),
None,
)
if not light:
available_devices = [
getattr(d, "name", "unnamed") for d in devices
]
raise Exception(
f"Device '{self.cync_device_name}' not found. Available devices: {available_devices}"
)
logging.info(
f"Selected device: {getattr(light, 'name', 'unnamed')}"
)
# Execute device operations
operations_completed = []
# Set power
if power == "on":
result = await light.turn_on()
operations_completed.append(f"turn_on: {result}")
else:
result = await light.turn_off()
operations_completed.append(f"turn_off: {result}")
# Set brightness
if "brightness" in state:
result = await light.set_brightness(brightness)
operations_completed.append(
f"set_brightness({brightness}): {result}"
)
# Set color
if rgb:
result = await light.set_rgb(rgb)
operations_completed.append(f"set_rgb({rgb}): {result}")
logging.info(
f"All operations completed successfully: {operations_completed}"
)
break # Success, exit retry loop
except (
aiohttp.ClientConnectionError,
aiohttp.ClientOSError,
aiohttp.ServerDisconnectedError,
aiohttp.ClientConnectorError,
ConnectionResetError,
ConnectionError,
OSError,
asyncio.TimeoutError,
) as e:
last_exception = e
logging.warning(
f"Connection/network error (attempt {attempt + 1}/{max_retries}): {type(e).__name__}: {e}"
)
if attempt < max_retries - 1:
# Wait a bit before retry to allow network/server recovery
await asyncio.sleep(
2**attempt
) # Exponential backoff: 1s, 2s, 4s
continue
except (AuthFailedError, TwoFactorRequiredError) as e:
last_exception = e
logging.error(
f"Authentication error (attempt {attempt + 1}/{max_retries}): {e}"
)
if attempt < max_retries - 1:
# Clear cached tokens on auth errors
try:
os.remove(self.token_cache_path)
logging.info("Cleared token cache due to auth error")
except (OSError, FileNotFoundError):
pass
await asyncio.sleep(1)
continue
except Exception as e:
last_exception = e
error_msg = f"Unexpected error (attempt {attempt + 1}/{max_retries}): {type(e).__name__}: {e}"
logging.error(error_msg)
# On unexpected errors, try reconnecting for next attempt
if attempt < max_retries - 1:
logging.warning(
"Forcing full reconnection due to unexpected error..."
)
await asyncio.sleep(1)
continue
# If we get here, all retries failed
logging.error(
f"All {max_retries} attempts failed. Last error: {type(last_exception).__name__}: {last_exception}"
)
raise last_exception
logging.info(
"Successfully applied state to device '%s': %s",
self.cync_device_name,
state,
)
return JSONResponse(
content={
"message": "Lighting state updated and applied",
"state": state,
}
)
except HTTPException:
raise
except Exception as e:
logging.error("Error setting lighting state: %s", e)
raise HTTPException(status_code=500, detail="Internal server error")