diff --git a/base.py b/base.py index 5f15fb8..8b546bf 100644 --- a/base.py +++ b/base.py @@ -34,27 +34,27 @@ async def lifespan(app: FastAPI): # Startup uvicorn_access_logger = logging.getLogger("uvicorn.access") uvicorn_access_logger.disabled = True - + # Start Radio playlists if "radio" in _routes and hasattr(_routes["radio"], "on_start"): await _routes["radio"].on_start() - + # Start endpoint background tasks if "trip" in _routes and hasattr(_routes["trip"], "startup"): await _routes["trip"].startup() if "lighting" in _routes and hasattr(_routes["lighting"], "startup"): await _routes["lighting"].startup() - + logger.info("Application startup complete") - + yield - + # Shutdown if "lighting" in _routes and hasattr(_routes["lighting"], "shutdown"): await _routes["lighting"].shutdown() if "trip" in _routes and hasattr(_routes["trip"], "shutdown"): await _routes["trip"].shutdown() - + logger.info("Application shutdown complete") @@ -141,24 +141,26 @@ End Blacklisted Routes Actionable Routes """ -_routes.update({ - "randmsg": importlib.import_module("endpoints.rand_msg").RandMsg( - app, util, constants - ), - "lyrics": importlib.import_module("endpoints.lyric_search").LyricSearch( - app, util, constants - ), - "yt": importlib.import_module("endpoints.yt").YT(app, util, constants), - "radio": importlib.import_module("endpoints.radio").Radio( - app, util, constants, loop - ), - "meme": importlib.import_module("endpoints.meme").Meme(app, util, constants), - "trip": importlib.import_module("endpoints.rip").RIP(app, util, constants), - "auth": importlib.import_module("endpoints.auth").Auth(app), - "lighting": importlib.import_module("endpoints.lighting").Lighting( - app, util, constants - ), -}) +_routes.update( + { + "randmsg": importlib.import_module("endpoints.rand_msg").RandMsg( + app, util, constants + ), + "lyrics": importlib.import_module("endpoints.lyric_search").LyricSearch( + app, util, constants + ), + "yt": importlib.import_module("endpoints.yt").YT(app, util, constants), + "radio": importlib.import_module("endpoints.radio").Radio( + app, util, constants, loop + ), + "meme": importlib.import_module("endpoints.meme").Meme(app, util, constants), + "trip": importlib.import_module("endpoints.rip").RIP(app, util, constants), + "auth": importlib.import_module("endpoints.auth").Auth(app), + "lighting": importlib.import_module("endpoints.lighting").Lighting( + app, util, constants + ), + } +) # Misc endpoint depends on radio endpoint instance radio_endpoint = _routes.get("radio") diff --git a/endpoints/lighting.py b/endpoints/lighting.py index a1314b1..853aadb 100644 --- a/endpoints/lighting.py +++ b/endpoints/lighting.py @@ -42,6 +42,7 @@ logger = logging.getLogger(__name__) @dataclass class CyncConnectionState: """Track the state of our Cync connection.""" + session: Optional[aiohttp.ClientSession] = None auth: Optional[Auth] = None cync_api: Optional[Cync] = None @@ -53,52 +54,51 @@ class CyncConnectionState: class Lighting: """ Cync Lighting Controller - + Manages authentication and device control for Cync smart lights. Uses pycync library which maintains a TCP connection for device commands. """ - + # Configuration TOKEN_EXPIRY_BUFFER = 300 # Consider token expired 5 min before actual expiry CONNECTION_READY_TIMEOUT = 15 # Max seconds to wait for TCP connection to be ready COMMAND_DELAY = 0.3 # Delay between sequential commands MAX_RETRIES = 3 - + def __init__(self, app: FastAPI, util: Any, constants: Any) -> None: load_dotenv() - + self.app = app self.util = util self.constants = constants - + # Redis for state persistence self.redis_client = redis.Redis( - password=private.REDIS_PW, - decode_responses=True + password=private.REDIS_PW, decode_responses=True ) self.lighting_key = "lighting:state" - + # Cync configuration from environment self.cync_email = os.getenv("CYNC_EMAIL") self.cync_password = os.getenv("CYNC_PASSWORD") self.cync_device_name = os.getenv("CYNC_DEVICE_NAME") self.token_cache_path = "cync_token.json" - + # Connection state self._state = CyncConnectionState() self._connection_lock = asyncio.Lock() self._health_task: Optional[asyncio.Task] = None - + # Register routes self._register_routes() - + def _register_routes(self) -> None: """Register FastAPI routes.""" common_deps = [ Depends(RateLimiter(times=25, seconds=2)), Depends(get_current_user), ] - + self.app.add_api_route( "/lighting/state", self.get_lighting_state, @@ -106,7 +106,7 @@ class Lighting: dependencies=common_deps, include_in_schema=False, ) - + self.app.add_api_route( "/lighting/state", self.set_lighting_state, @@ -114,25 +114,25 @@ class Lighting: dependencies=common_deps, include_in_schema=False, ) - + # ========================================================================= # Lifecycle Management # ========================================================================= - + async def startup(self) -> None: """Initialize on app startup. Call from lifespan context manager.""" self._validate_config() - + try: await self._connect() logger.info("Cync lighting initialized successfully") except Exception as e: logger.error(f"Failed to initialize Cync at startup: {e}") # Don't raise - allow app to start, will retry on first request - + # Start background health monitoring self._health_task = asyncio.create_task(self._health_monitor()) - + async def shutdown(self) -> None: """Cleanup on app shutdown. Call from lifespan context manager.""" if self._health_task: @@ -141,10 +141,10 @@ class Lighting: await self._health_task except asyncio.CancelledError: pass - + await self._disconnect() logger.info("Cync lighting shut down") - + def _validate_config(self) -> None: """Validate required environment variables.""" missing = [] @@ -154,18 +154,18 @@ class Lighting: missing.append("CYNC_PASSWORD") if not self.cync_device_name: missing.append("CYNC_DEVICE_NAME") - + if missing: raise RuntimeError(f"Missing required env vars: {', '.join(missing)}") - + # ========================================================================= # Connection Management # ========================================================================= - + async def _connect(self, force: bool = False) -> None: """ Establish connection to Cync cloud. - + This creates the aiohttp session, authenticates, and initializes the pycync API which starts its TCP connection. """ @@ -173,124 +173,124 @@ class Lighting: # Check if we need to connect if not force and self._is_connection_valid(): return - + logger.info("Establishing Cync connection...") - + # Clean up existing connection await self._disconnect_unlocked() - + # Create HTTP session timeout = aiohttp.ClientTimeout(total=30, connect=10) self._state.session = aiohttp.ClientSession(timeout=timeout) - + # Authenticate await self._authenticate() - + # Create Cync API (starts TCP connection) logger.info("Creating Cync API instance...") assert self._state.auth is not None # Set by _authenticate self._state.cync_api = await Cync.create(self._state.auth) - + # Wait for TCP connection to be ready await self._wait_for_connection_ready() - + self._state.connected_at = time.time() logger.info("Cync connection established") - + async def _disconnect(self) -> None: """Disconnect and cleanup resources.""" async with self._connection_lock: await self._disconnect_unlocked() - + async def _disconnect_unlocked(self) -> None: """Disconnect without acquiring lock (internal use).""" # Shutdown pycync TCP connection if self._state.cync_api: try: # pycync's command client has a shut_down method - client = getattr(self._state.cync_api, '_command_client', None) + client = getattr(self._state.cync_api, "_command_client", None) if client: await client.shut_down() except Exception as e: logger.warning(f"Error shutting down Cync client: {e}") - + # Close HTTP session if self._state.session and not self._state.session.closed: await self._state.session.close() await asyncio.sleep(0.1) # Allow cleanup - + # Reset state self._state = CyncConnectionState() - + def _is_connection_valid(self) -> bool: """Check if current connection is usable.""" if not self._state.cync_api or not self._state.session: return False - + if self._state.session.closed: return False - + # Check token expiry if self._is_token_expired(): logger.info("Token expired or expiring soon") return False - + return True - + def _is_token_expired(self) -> bool: """Check if token is expired or will expire soon.""" if not self._state.user: return True - - expires_at = getattr(self._state.user, 'expires_at', 0) + + expires_at = getattr(self._state.user, "expires_at", 0) return expires_at < (time.time() + self.TOKEN_EXPIRY_BUFFER) - + async def _wait_for_connection_ready(self) -> None: """ Wait for pycync TCP connection to be fully ready. - + pycync's TCP manager waits for login acknowledgment before sending any commands. We need to wait for this to complete. """ if not self._state.cync_api: raise RuntimeError("Cync API not initialized") - - client = getattr(self._state.cync_api, '_command_client', None) + + client = getattr(self._state.cync_api, "_command_client", None) if not client: logger.warning("Could not access command client") return - - tcp_manager = getattr(client, '_tcp_manager', None) + + tcp_manager = getattr(client, "_tcp_manager", None) if not tcp_manager: logger.warning("Could not access TCP manager") return - + # Wait for login to be acknowledged start = time.time() - while not getattr(tcp_manager, '_login_acknowledged', False): + while not getattr(tcp_manager, "_login_acknowledged", False): if time.time() - start > self.CONNECTION_READY_TIMEOUT: raise TimeoutError("Timed out waiting for Cync login acknowledgment") await asyncio.sleep(0.2) logger.debug("Waiting for Cync TCP login acknowledgment...") - + # Give a tiny bit more time for device probing to start await asyncio.sleep(0.5) logger.info(f"Cync TCP connection ready (took {time.time() - start:.1f}s)") - + # ========================================================================= # Authentication # ========================================================================= - + async def _authenticate(self) -> None: """Authenticate with Cync, using cached token if valid.""" # Try cached token first cached_user = self._load_cached_token() - + # These are validated by _validate_config at startup assert self._state.session is not None assert self.cync_email is not None assert self.cync_password is not None - + if cached_user and not self._is_user_token_expired(cached_user): logger.info("Using cached Cync token") self._state.auth = Auth( @@ -301,7 +301,7 @@ class Lighting: ) self._state.user = cached_user return - + # Need fresh login logger.info("Performing fresh Cync login...") self._state.auth = Auth( @@ -309,7 +309,7 @@ class Lighting: username=self.cync_email, password=self.cync_password, ) - + try: self._state.user = await self._state.auth.login() self._save_cached_token(self._state.user) @@ -319,14 +319,14 @@ class Lighting: except AuthFailedError as e: logger.error(f"Cync authentication failed: {e}") raise - + async def _handle_2fa(self) -> None: """Handle 2FA authentication.""" import sys - + # Try environment variable first twofa_code = os.getenv("CYNC_2FA_CODE") - + # If not set, prompt interactively if not twofa_code: print("\n" + "=" * 50) @@ -336,23 +336,22 @@ class Lighting: print("Enter the code below (you have 60 seconds):") print("=" * 50) sys.stdout.flush() - + # Use asyncio to read with timeout try: loop = asyncio.get_event_loop() twofa_code = await asyncio.wait_for( - loop.run_in_executor(None, input, "2FA Code: "), - timeout=60.0 + loop.run_in_executor(None, input, "2FA Code: "), timeout=60.0 ) twofa_code = twofa_code.strip() except asyncio.TimeoutError: logger.error("2FA code entry timed out") raise RuntimeError("2FA code entry timed out") - + if not twofa_code: logger.error("No 2FA code provided") raise RuntimeError("Cync 2FA required but no code provided") - + logger.info("Retrying Cync login with 2FA code") try: assert self._state.auth is not None @@ -362,48 +361,48 @@ class Lighting: except Exception as e: logger.error(f"Cync 2FA login failed: {e}") raise - + def _is_user_token_expired(self, user: User) -> bool: """Check if a user's token is expired.""" - expires_at = getattr(user, 'expires_at', 0) + expires_at = getattr(user, "expires_at", 0) return expires_at < (time.time() + self.TOKEN_EXPIRY_BUFFER) - + def _load_cached_token(self) -> Optional[User]: """Load cached authentication token from disk.""" try: if not os.path.exists(self.token_cache_path): return None - - with open(self.token_cache_path, 'r') as f: + + with open(self.token_cache_path, "r") as f: data = json.load(f) - + return User( - access_token=data['access_token'], - refresh_token=data['refresh_token'], - authorize=data['authorize'], - user_id=data['user_id'], - expires_at=data['expires_at'], + access_token=data["access_token"], + refresh_token=data["refresh_token"], + authorize=data["authorize"], + user_id=data["user_id"], + expires_at=data["expires_at"], ) except Exception as e: logger.warning(f"Failed to load cached token: {e}") return None - + def _save_cached_token(self, user: User) -> None: """Save authentication token to disk.""" try: data = { - 'access_token': user.access_token, - 'refresh_token': user.refresh_token, - 'authorize': user.authorize, - 'user_id': user.user_id, - 'expires_at': user.expires_at, + "access_token": user.access_token, + "refresh_token": user.refresh_token, + "authorize": user.authorize, + "user_id": user.user_id, + "expires_at": user.expires_at, } - with open(self.token_cache_path, 'w') as f: + with open(self.token_cache_path, "w") as f: json.dump(data, f) logger.debug("Saved Cync token to disk") except Exception as e: logger.warning(f"Failed to save token: {e}") - + def _clear_cached_token(self) -> None: """Remove cached token file.""" try: @@ -412,17 +411,17 @@ class Lighting: logger.info("Cleared cached token") except OSError: pass - + # ========================================================================= # Health Monitoring # ========================================================================= - + async def _health_monitor(self) -> None: """Background task to monitor connection health and refresh tokens.""" while True: try: await asyncio.sleep(300) # Check every 5 minutes - + # Proactively refresh if token is expiring if self._is_token_expired(): logger.info("Token expiring, proactively reconnecting...") @@ -430,38 +429,38 @@ class Lighting: await self._connect(force=True) except Exception as e: logger.error(f"Proactive reconnection failed: {e}") - + except asyncio.CancelledError: break except Exception as e: logger.error(f"Health monitor error: {e}") - + # ========================================================================= # Device Control # ========================================================================= - + async def _get_device(self): """Get the target light device.""" if not self._state.cync_api: raise RuntimeError("Cync not connected") - + devices = self._state.cync_api.get_devices() if not devices: raise RuntimeError("No devices found") - + device = next( - (d for d in devices if getattr(d, 'name', None) == self.cync_device_name), - None + (d for d in devices if getattr(d, "name", None) == self.cync_device_name), + None, ) - + if not device: - available = [getattr(d, 'name', 'unnamed') for d in devices] + available = [getattr(d, "name", "unnamed") for d in devices] raise RuntimeError( f"Device '{self.cync_device_name}' not found. Available: {available}" ) - + return device - + async def _send_commands( self, power: str, @@ -470,13 +469,13 @@ class Lighting: ) -> None: """ Send commands to the light device. - + Commands are sent sequentially with small delays to ensure the TCP connection processes each one. """ device = await self._get_device() logger.info(f"Sending commands to device: {device.name}") - + # Power if power == "on": await device.turn_on() @@ -485,107 +484,128 @@ class Lighting: await device.turn_off() logger.debug("Sent turn_off") await asyncio.sleep(self.COMMAND_DELAY) - + # Brightness if brightness is not None: await device.set_brightness(brightness) logger.debug(f"Sent brightness: {brightness}") await asyncio.sleep(self.COMMAND_DELAY) - + # Color if rgb: await device.set_rgb(rgb) logger.debug(f"Sent RGB: {rgb}") await asyncio.sleep(self.COMMAND_DELAY) - + self._state.last_command_at = time.time() - + # ========================================================================= # API Endpoints # ========================================================================= - + async def get_lighting_state(self, user=Depends(get_current_user)) -> JSONResponse: """Get the current lighting state from Redis.""" - if "lighting" not in user.get("roles", []) and "admin" not in user.get("roles", []): + if "lighting" not in user.get("roles", []) and "admin" not in user.get( + "roles", [] + ): raise HTTPException(status_code=403, detail="Insufficient permissions") try: state = self.redis_client.get(self.lighting_key) if state: return JSONResponse(content=json.loads(str(state))) - + # Default state - return JSONResponse(content={ - "power": "off", - "brightness": 50, - "color": {"r": 255, "g": 255, "b": 255}, - }) + return JSONResponse( + content={ + "power": "off", + "brightness": 50, + "color": {"r": 255, "g": 255, "b": 255}, + } + ) except Exception as e: logger.error(f"Error getting lighting state: {e}") raise HTTPException(status_code=500, detail="Internal server error") - - async def set_lighting_state(self, request: Request, - user=Depends(get_current_user)) -> JSONResponse: + + async def set_lighting_state( + self, request: Request, user=Depends(get_current_user) + ) -> JSONResponse: """Set the lighting state and apply to Cync device.""" try: - if "lighting" not in user.get("roles", []) and "admin" not in user.get("roles", []): - raise HTTPException(status_code=403, detail="Insufficient permissions") + if "lighting" not in user.get("roles", []) and "admin" not in user.get( + "roles", [] + ): + raise HTTPException(status_code=403, detail="Insufficient permissions") state = await request.json() logger.info(f"Lighting request: {state}") - + # Validate if not isinstance(state, dict): - raise HTTPException(status_code=400, detail="State must be a JSON object") - + raise HTTPException( + status_code=400, detail="State must be a JSON object" + ) + power, brightness, rgb = self._parse_state(state) - + # Save to Redis (even if device command fails) self.redis_client.set(self.lighting_key, json.dumps(state)) - + # Apply to device with retries await self._apply_state_with_retry(power, brightness, rgb) - - logger.info(f"Successfully applied state: power={power}, brightness={brightness}, rgb={rgb}") - return JSONResponse(content={ - "message": "Lighting state updated", - "state": state, - }) - + + logger.info( + f"Successfully applied state: power={power}, brightness={brightness}, rgb={rgb}" + ) + return JSONResponse( + content={ + "message": "Lighting state updated", + "state": state, + } + ) + except HTTPException: raise except Exception as e: logger.error(f"Error setting lighting state: {e}") raise HTTPException(status_code=500, detail=str(e)) - + def _parse_state(self, state: dict) -> tuple[str, Optional[int], Optional[tuple]]: """Parse and validate lighting state from request.""" # Power power = state.get("power", "off") if power not in ("on", "off"): raise HTTPException(status_code=400, detail=f"Invalid power: {power}") - + # Brightness brightness = None if "brightness" in state: brightness = state["brightness"] if not isinstance(brightness, (int, float)) or not (0 <= brightness <= 100): - raise HTTPException(status_code=400, detail=f"Invalid brightness: {brightness}") + raise HTTPException( + status_code=400, detail=f"Invalid brightness: {brightness}" + ) brightness = int(brightness) - + # Color rgb = None color = state.get("color") - if color and isinstance(color, dict) and all(k in color for k in ("r", "g", "b")): + if ( + color + and isinstance(color, dict) + and all(k in color for k in ("r", "g", "b")) + ): rgb = (color["r"], color["g"], color["b"]) elif all(k in state for k in ("red", "green", "blue")): rgb = (state["red"], state["green"], state["blue"]) - + if rgb: for i, name in enumerate(("red", "green", "blue")): if not isinstance(rgb[i], int) or not (0 <= rgb[i] <= 255): - raise HTTPException(status_code=400, detail=f"Invalid {name}: {rgb[i]}") - + raise HTTPException( + status_code=400, detail=f"Invalid {name}: {rgb[i]}" + ) + return power, brightness, rgb - + async def _apply_state_with_retry( self, power: str, @@ -594,35 +614,37 @@ class Lighting: ) -> None: """Apply state to device with connection retry logic.""" last_error: Optional[Exception] = None - + for attempt in range(self.MAX_RETRIES): try: # Ensure connection (force reconnect on retries) await self._connect(force=(attempt > 0)) - + # Send commands await self._send_commands(power, brightness, rgb) return # Success - + except (AuthFailedError, TwoFactorRequiredError) as e: last_error = e logger.warning(f"Auth error on attempt {attempt + 1}: {e}") self._clear_cached_token() - + except TimeoutError as e: last_error = e logger.warning(f"Timeout on attempt {attempt + 1}: {e}") - + except Exception as e: last_error = e - logger.warning(f"Error on attempt {attempt + 1}: {type(e).__name__}: {e}") - + logger.warning( + f"Error on attempt {attempt + 1}: {type(e).__name__}: {e}" + ) + # Wait before retry (exponential backoff) if attempt < self.MAX_RETRIES - 1: - wait_time = 2 ** attempt + wait_time = 2**attempt logger.info(f"Retrying in {wait_time}s...") await asyncio.sleep(wait_time) - + # All retries failed logger.error(f"All {self.MAX_RETRIES} attempts failed") raise last_error or RuntimeError("Failed to apply lighting state") diff --git a/endpoints/lyric_search.py b/endpoints/lyric_search.py index 38cfa91..683efc6 100644 --- a/endpoints/lyric_search.py +++ b/endpoints/lyric_search.py @@ -96,9 +96,11 @@ class LyricSearch(FastAPI): handler, methods=["POST"], include_in_schema=_schema_include, - dependencies=[Depends(RateLimiter(times=times, seconds=seconds))] - if not endpoint == "typeahead/lyrics" - else None, + dependencies=( + [Depends(RateLimiter(times=times, seconds=seconds))] + if not endpoint == "typeahead/lyrics" + else None + ), ) async def typeahead_handler(self, data: ValidTypeAheadRequest) -> JSONResponse: @@ -243,9 +245,9 @@ class LyricSearch(FastAPI): if i + line_count <= len(lyric_lines): # Combine consecutive lines with space separator combined_lines = [] - line_positions: list[ - tuple[int, int] - ] = [] # Track where each line starts in combined text + line_positions: list[tuple[int, int]] = ( + [] + ) # Track where each line starts in combined text combined_text_parts: list[str] = [] for j in range(line_count): diff --git a/endpoints/rip.py b/endpoints/rip.py index 1b317b7..0d05e43 100644 --- a/endpoints/rip.py +++ b/endpoints/rip.py @@ -75,7 +75,11 @@ class RIP(FastAPI): app.add_api_route( f"/{endpoint}", handler, - methods=["GET"] if endpoint not in ("trip/bulk_fetch", "trip/auth/check") else ["POST"], + methods=( + ["GET"] + if endpoint not in ("trip/bulk_fetch", "trip/auth/check") + else ["POST"] + ), include_in_schema=False, dependencies=dependencies, ) @@ -131,9 +135,11 @@ class RIP(FastAPI): "started_at": job.started_at, "ended_at": job.ended_at, "progress": progress, - "tracks": f"{succeeded_tracks} / {tracks_in}" - if isinstance(tracks_in, int) - else tracks_out, + "tracks": ( + f"{succeeded_tracks} / {tracks_in}" + if isinstance(tracks_in, int) + else tracks_out + ), "target": job.meta.get("target"), "quality": job.meta.get("quality", "Unknown"), } @@ -153,7 +159,7 @@ class RIP(FastAPI): - **Response**: JSON response with artists or 404. """ if "trip" not in user.get("roles", []) and "admin" not in user.get("roles", []): - raise HTTPException(status_code=403, detail="Insufficient permissions") + raise HTTPException(status_code=403, detail="Insufficient permissions") # support optional grouping to return one primary per display name # with `alternatives` for disambiguation (use ?group=true) group = bool(request.query_params.get("group", False)) @@ -177,7 +183,7 @@ class RIP(FastAPI): - **Response**: JSON response with albums or 404. """ if "trip" not in user.get("roles", []) and "admin" not in user.get("roles", []): - raise HTTPException(status_code=403, detail="Insufficient permissions") + raise HTTPException(status_code=403, detail="Insufficient permissions") albums = await self.trip_util.get_albums_by_artist_id(artist_id) if not albums: return Response(status_code=404, content="Not found") @@ -203,7 +209,7 @@ class RIP(FastAPI): - **Response**: JSON response with tracks or 404. """ if "trip" not in user.get("roles", []) and "admin" not in user.get("roles", []): - raise HTTPException(status_code=403, detail="Insufficient permissions") + raise HTTPException(status_code=403, detail="Insufficient permissions") tracks = await self.trip_util.get_tracks_by_album_id(album_id, quality) if not tracks: return Response(status_code=404, content="Not Found") @@ -225,7 +231,7 @@ class RIP(FastAPI): - **Response**: JSON response with tracks or 404. """ if "trip" not in user.get("roles", []) and "admin" not in user.get("roles", []): - raise HTTPException(status_code=403, detail="Insufficient permissions") + raise HTTPException(status_code=403, detail="Insufficient permissions") logging.critical("Searching for tracks by artist: %s, song: %s", artist, song) tracks = await self.trip_util.get_tracks_by_artist_song(artist, song) if not tracks: @@ -252,7 +258,7 @@ class RIP(FastAPI): - **Response**: JSON response with stream URL or 404. """ if "trip" not in user.get("roles", []) and "admin" not in user.get("roles", []): - raise HTTPException(status_code=403, detail="Insufficient permissions") + raise HTTPException(status_code=403, detail="Insufficient permissions") track = await self.trip_util.get_stream_url_by_track_id(track_id, quality) if not track: return Response(status_code=404, content="Not found") @@ -276,7 +282,7 @@ class RIP(FastAPI): - **Response**: JSON response with job info or error. """ if "trip" not in user.get("roles", []) and "admin" not in user.get("roles", []): - raise HTTPException(status_code=403, detail="Insufficient permissions") + raise HTTPException(status_code=403, detail="Insufficient permissions") if not data or not data.track_ids or not data.target: return JSONResponse( content={ @@ -329,7 +335,7 @@ class RIP(FastAPI): - **JSONResponse**: Job status and result or error. """ if "trip" not in user.get("roles", []) and "admin" not in user.get("roles", []): - raise HTTPException(status_code=403, detail="Insufficient permissions") + raise HTTPException(status_code=403, detail="Insufficient permissions") job = None try: # Try direct fetch first @@ -368,7 +374,7 @@ class RIP(FastAPI): - **JSONResponse**: List of jobs. """ if "trip" not in user.get("roles", []) and "admin" not in user.get("roles", []): - raise HTTPException(status_code=403, detail="Insufficient permissions") + raise HTTPException(status_code=403, detail="Insufficient permissions") jobs_info = [] seen = set() @@ -426,7 +432,7 @@ class RIP(FastAPI): ): """ Start Tidal device authorization flow. - + Returns a URL that the user must visit to authorize the application. After visiting the URL and authorizing, call /trip/auth/check to complete. @@ -434,8 +440,10 @@ class RIP(FastAPI): - **JSONResponse**: Contains device_code and verification_url. """ try: - if "trip" not in user.get("roles", []) and "admin" not in user.get("roles", []): - raise HTTPException(status_code=403, detail="Insufficient permissions") + if "trip" not in user.get("roles", []) and "admin" not in user.get( + "roles", [] + ): + raise HTTPException(status_code=403, detail="Insufficient permissions") device_code, verification_url = await self.trip_util.start_device_auth() # Store device code for this session self._pending_device_codes[user.get("sub", "default")] = device_code @@ -458,18 +466,20 @@ class RIP(FastAPI): ): """ Check if Tidal device authorization is complete. - + Call this after the user has visited the verification URL. Returns: - **JSONResponse**: Contains success status and message. """ if "trip" not in user.get("roles", []) and "admin" not in user.get("roles", []): - raise HTTPException(status_code=403, detail="Insufficient permissions") + raise HTTPException(status_code=403, detail="Insufficient permissions") device_code = self._pending_device_codes.get(user.get("sub", "default")) if not device_code: return JSONResponse( - content={"error": "No pending authorization. Call /trip/auth/start first."}, + content={ + "error": "No pending authorization. Call /trip/auth/start first." + }, status_code=400, ) @@ -479,11 +489,18 @@ class RIP(FastAPI): # Clear the pending code self._pending_device_codes.pop(user.get("sub", "default"), None) return JSONResponse( - content={"success": True, "message": "Tidal authorization complete!"} + content={ + "success": True, + "message": "Tidal authorization complete!", + } ) elif error == "pending": return JSONResponse( - content={"success": False, "pending": True, "message": "Waiting for user to authorize..."} + content={ + "success": False, + "pending": True, + "message": "Waiting for user to authorize...", + } ) else: return JSONResponse( diff --git a/endpoints/yt.py b/endpoints/yt.py index 4976bbd..1ccbe20 100644 --- a/endpoints/yt.py +++ b/endpoints/yt.py @@ -6,6 +6,7 @@ from typing import Optional, Union from utils.yt_utils import sign_video_id from .constructors import ValidYTSearchRequest + class YT(FastAPI): """ YT Endpoints diff --git a/lyric_search/models.py b/lyric_search/models.py index 68f3f81..2f04e87 100644 --- a/lyric_search/models.py +++ b/lyric_search/models.py @@ -99,7 +99,9 @@ POSTGRES_PASSWORD = os.getenv("POSTGRES_PASSWORD", "") # URL-encode the password to handle special characters encoded_password = urllib.parse.quote_plus(POSTGRES_PASSWORD) -DATABASE_URL: str = f"postgresql+asyncpg://{POSTGRES_USER}:{encoded_password}@{POSTGRES_HOST}:{POSTGRES_PORT}/{POSTGRES_DB}" +DATABASE_URL: str = ( + f"postgresql+asyncpg://{POSTGRES_USER}:{encoded_password}@{POSTGRES_HOST}:{POSTGRES_PORT}/{POSTGRES_DB}" +) async_engine: AsyncEngine = create_async_engine( DATABASE_URL, pool_size=20, max_overflow=10, pool_pre_ping=True, echo=False ) diff --git a/lyric_search/sources/cache.py b/lyric_search/sources/cache.py index 6887416..cb80d9c 100644 --- a/lyric_search/sources/cache.py +++ b/lyric_search/sources/cache.py @@ -91,8 +91,10 @@ class Cache: logging.debug( "Checking whether %s is already stored", artistsong.replace("\n", " - ") ) - check_query: str = 'SELECT id, artist, song FROM lyrics WHERE editdist3((lower(artist) || " " || lower(song)), (? || " " || ?))\ + check_query: str = ( + 'SELECT id, artist, song FROM lyrics WHERE editdist3((lower(artist) || " " || lower(song)), (? || " " || ?))\ <= 410 ORDER BY editdist3((lower(artist) || " " || lower(song)), ?) ASC LIMIT 1' + ) artistsong_split = artistsong.split("\n", maxsplit=1) artist = artistsong_split[0].lower() song = artistsong_split[1].lower() @@ -213,10 +215,8 @@ class Cache: lyrics = regex.sub(r"(
|\n|\r\n)", " / ", lyr_result.lyrics.strip()) lyrics = regex.sub(r"\s{2,}", " ", lyrics) - insert_query = ( - "INSERT INTO lyrics (src, date_retrieved, artist, song, artistsong, confidence, lyrics)\ + insert_query = "INSERT INTO lyrics (src, date_retrieved, artist, song, artistsong, confidence, lyrics)\ VALUES(?, ?, ?, ?, ?, ?, ?)" - ) params = ( lyr_result.src, time.time(), @@ -260,8 +260,10 @@ class Cache: if artist == "!" and song == "!": random_search = True - search_query: str = "SELECT id, artist, song, lyrics, src, confidence\ + search_query: str = ( + "SELECT id, artist, song, lyrics, src, confidence\ FROM lyrics ORDER BY RANDOM() LIMIT 1" + ) logging.info("Searching %s - %s on %s", artist, song, self.label) @@ -320,9 +322,11 @@ class Cache: self.cache_pre_query ) as _db_cursor: if not random_search: - search_query: str = 'SELECT id, artist, song, lyrics, src, confidence FROM lyrics\ + search_query: str = ( + 'SELECT id, artist, song, lyrics, src, confidence FROM lyrics\ WHERE editdist3((lower(artist) || " " || lower(song)), (? || " " || ?))\ <= 410 ORDER BY editdist3((lower(artist) || " " || lower(song)), ?) ASC LIMIT 10' + ) search_params: tuple = ( artist.strip(), song.strip(), diff --git a/lyric_search/utils.py b/lyric_search/utils.py index 2f874fa..743b9c6 100644 --- a/lyric_search/utils.py +++ b/lyric_search/utils.py @@ -111,9 +111,8 @@ class DataUtils: """ def __init__(self) -> None: - self.lrc_regex = ( - regex.compile( # capture mm:ss and optional .xxx, then the lyric text - r""" + self.lrc_regex = regex.compile( # capture mm:ss and optional .xxx, then the lyric text + r""" \[ # literal “[” ( # 1st (and only) capture group: [0-9]{2} # two-digit minutes @@ -124,8 +123,7 @@ class DataUtils: \s* # optional whitespace (.*) # capture the rest of the line as words """, - regex.VERBOSE, - ) + regex.VERBOSE, ) self.scrub_regex_1: Pattern = regex.compile(r"(\[.*?\])(\s){0,}(\:){0,1}") self.scrub_regex_2: Pattern = regex.compile( diff --git a/utils/meme_util.py b/utils/meme_util.py index d2061ba..087e497 100644 --- a/utils/meme_util.py +++ b/utils/meme_util.py @@ -127,7 +127,9 @@ class MemeUtil: db_conn.row_factory = sqlite3.Row rows_per_page: int = 10 offset: int = (page - 1) * rows_per_page - query: str = "SELECT id, timestamp FROM memes ORDER BY timestamp DESC LIMIT 10 OFFSET ?" + query: str = ( + "SELECT id, timestamp FROM memes ORDER BY timestamp DESC LIMIT 10 OFFSET ?" + ) async with await db_conn.execute(query, (offset,)) as db_cursor: results = await db_cursor.fetchall() for result in results: diff --git a/utils/sr_wrapper.py b/utils/sr_wrapper.py index 79d46de..0e50198 100644 --- a/utils/sr_wrapper.py +++ b/utils/sr_wrapper.py @@ -15,11 +15,11 @@ import time # Monkey-patch streamrip's Tidal client credentials BEFORE importing TidalClient import streamrip.client.tidal as _tidal_module # type: ignore # noqa: E402 + _tidal_module.CLIENT_ID = "fX2JxdmntZWK0ixT" _tidal_module.CLIENT_SECRET = "1Nn9AfDAjxrgJFJbKNWLeAyKGVGmINuXPPLHVXAvxAg=" _tidal_module.AUTH = aiohttp.BasicAuth( - login=_tidal_module.CLIENT_ID, - password=_tidal_module.CLIENT_SECRET + login=_tidal_module.CLIENT_ID, password=_tidal_module.CLIENT_SECRET ) from streamrip.client import TidalClient # type: ignore # noqa: E402 @@ -99,21 +99,21 @@ class SRUtil: async def start_keepalive(self) -> None: """Start the background keepalive task. - + This should be called once at startup to ensure the Tidal session stays alive even during idle periods. """ if self._keepalive_task and not self._keepalive_task.done(): logging.info("Tidal keepalive task already running") return - + # Ensure initial login try: await self._login_and_persist() logging.info("Initial Tidal login successful") except Exception as e: logging.warning("Initial Tidal login failed: %s", e) - + self._keepalive_task = asyncio.create_task(self._keepalive_runner()) logging.info("Tidal keepalive task started") @@ -132,14 +132,14 @@ class SRUtil: while True: try: await asyncio.sleep(self.KEEPALIVE_INTERVAL) - + # Check if we've had recent activity if self._last_successful_request: time_since_last = time.time() - self._last_successful_request if time_since_last < self.KEEPALIVE_INTERVAL: # Recent activity, no need to ping continue - + # Check if token is expiring soon and proactively refresh if self._is_token_expiring_soon(): logging.info("Tidal keepalive: Token expiring soon, refreshing...") @@ -149,7 +149,7 @@ class SRUtil: except Exception as e: logging.warning("Tidal keepalive: Token refresh failed: %s", e) continue - + # Check if session is stale if self._is_session_stale(): logging.info("Tidal keepalive: Session stale, refreshing...") @@ -157,9 +157,11 @@ class SRUtil: await self._login_and_persist(force=True) logging.info("Tidal keepalive: Session refresh successful") except Exception as e: - logging.warning("Tidal keepalive: Session refresh failed: %s", e) + logging.warning( + "Tidal keepalive: Session refresh failed: %s", e + ) continue - + # Make a lightweight API call to keep the session alive if self.streamrip_client.logged_in: try: @@ -178,7 +180,7 @@ class SRUtil: await self._login_and_persist(force=True) except Exception: pass - + except asyncio.CancelledError: logging.info("Tidal keepalive task cancelled") break @@ -195,7 +197,9 @@ class SRUtil: tidal.access_token = cached.get("access_token", "") tidal.refresh_token = cached.get("refresh_token", "") tidal.token_expiry = cached.get("token_expiry", "") - tidal.country_code = cached.get("country_code", os.getenv("tidal_country_code", "")) + tidal.country_code = cached.get( + "country_code", os.getenv("tidal_country_code", "") + ) else: tidal.user_id = os.getenv("tidal_user_id", "") tidal.access_token = os.getenv("tidal_access_token", "") @@ -212,7 +216,9 @@ class SRUtil: with open(TIDAL_TOKEN_CACHE_PATH, "r") as f: data = json.load(f) # Validate required fields exist - if all(k in data for k in ("access_token", "refresh_token", "token_expiry")): + if all( + k in data for k in ("access_token", "refresh_token", "token_expiry") + ): logging.info("Loaded Tidal tokens from cache") return data except Exception as e: @@ -248,22 +254,25 @@ class SRUtil: async def start_device_auth(self) -> tuple[str, str]: """Start device authorization flow. - + Returns: tuple: (device_code, verification_url) - User should visit the URL to authorize. """ - if not hasattr(self.streamrip_client, 'session') or not self.streamrip_client.session: + if ( + not hasattr(self.streamrip_client, "session") + or not self.streamrip_client.session + ): self.streamrip_client.session = await self.streamrip_client.get_session() - + device_code, verification_url = await self.streamrip_client._get_device_code() return device_code, verification_url async def check_device_auth(self, device_code: str) -> tuple[bool, Optional[str]]: """Check if user has completed device authorization. - + Args: device_code: The device code from start_device_auth() - + Returns: tuple: (success, error_message) - (True, None) if auth completed successfully @@ -271,7 +280,7 @@ class SRUtil: - (False, error_message) if auth failed """ status, auth_info = await self.streamrip_client._get_auth_status(device_code) - + if status == 0: # Success - apply new tokens self._apply_new_tokens(auth_info) @@ -300,7 +309,8 @@ class SRUtil: # token_expiry is typically an ISO timestamp string if isinstance(token_expiry, str): from datetime import datetime - expiry_dt = datetime.fromisoformat(token_expiry.replace('Z', '+00:00')) + + expiry_dt = datetime.fromisoformat(token_expiry.replace("Z", "+00:00")) expiry_ts = expiry_dt.timestamp() else: expiry_ts = float(token_expiry) @@ -318,14 +328,14 @@ class SRUtil: async def _force_fresh_login(self) -> bool: """Force a complete fresh login, ignoring logged_in state. - + Returns True if login succeeded, False otherwise. """ # Reset the logged_in flag to force a fresh login self.streamrip_client.logged_in = False - + # Close existing session if present - if hasattr(self.streamrip_client, 'session') and self.streamrip_client.session: + if hasattr(self.streamrip_client, "session") and self.streamrip_client.session: try: if not self.streamrip_client.session.closed: await self.streamrip_client.session.close() @@ -333,10 +343,10 @@ class SRUtil: logging.warning("Error closing old session: %s", e) # Use object.__setattr__ to bypass type checking for session reset try: - object.__setattr__(self.streamrip_client, 'session', None) + object.__setattr__(self.streamrip_client, "session", None) except Exception: pass # Session will be recreated on next login - + try: logging.info("Forcing fresh Tidal login...") await self.streamrip_client.login() @@ -345,49 +355,53 @@ class SRUtil: logging.info("Fresh Tidal login successful") return True except Exception as e: - logging.warning("Forced Tidal login failed: %s - device re-auth may be required", e) + logging.warning( + "Forced Tidal login failed: %s - device re-auth may be required", e + ) return False async def _login_and_persist(self, force: bool = False) -> None: """Login to Tidal and persist any refreshed tokens. - + Args: force: If True, force a fresh login even if already logged in. - + This method now checks for: 1. Token expiry - refreshes if token is about to expire 2. Session age - refreshes if session is too old 3. logged_in state - logs in if not logged in - + If refresh fails, logs a warning but does not raise. """ needs_login = force or not self.streamrip_client.logged_in - + # Check if token is expiring soon if not needs_login and self._is_token_expiring_soon(): logging.info("Tidal token expiring soon, will refresh") needs_login = True - + # Check if session is too old if not needs_login and self._is_session_stale(): logging.info("Tidal session is stale, will refresh") needs_login = True - + if not needs_login: return - + try: # Reset logged_in to ensure fresh login attempt if force or self._is_token_expiring_soon(): self.streamrip_client.logged_in = False - + await self.streamrip_client.login() self._last_login_time = time.time() # After login, tokens may have been refreshed - persist them self._save_cached_tokens() logging.info("Tidal login/refresh successful") except Exception as e: - logging.warning("Tidal login/refresh failed: %s - device re-auth may be required", e) + logging.warning( + "Tidal login/refresh failed: %s - device re-auth may be required", e + ) # Don't mark as logged in on failure - let subsequent calls retry async def rate_limited_request(self, func, *args, **kwargs): @@ -397,13 +411,15 @@ class SRUtil: elapsed = now - self.LAST_METADATA_REQUEST if elapsed < self.METADATA_RATE_LIMIT: await asyncio.sleep(self.METADATA_RATE_LIMIT - elapsed) - + # Ensure we're logged in before making the request try: await self._login_and_persist() except Exception as e: - logging.warning("Pre-request login failed in rate_limited_request: %s", e) - + logging.warning( + "Pre-request login failed in rate_limited_request: %s", e + ) + result = await func(*args, **kwargs) self.LAST_METADATA_REQUEST = time.time() return result @@ -432,8 +448,11 @@ class SRUtil: try: await self._login_and_persist() except Exception as login_err: - logging.warning("Pre-request login failed: %s (continuing anyway)", login_err) - + logging.warning( + "Pre-request login failed: %s (continuing anyway)", + login_err, + ) + result = await func(*args, **kwargs) # Track successful request self._last_successful_request = time.time() @@ -441,7 +460,12 @@ class SRUtil: except AttributeError as e: # Probably missing/closed client internals: try re-login once last_exc = e - logging.warning("AttributeError in API call (attempt %d/%d): %s", attempt + 1, retries, e) + logging.warning( + "AttributeError in API call (attempt %d/%d): %s", + attempt + 1, + retries, + e, + ) try: await self._force_fresh_login() except Exception: @@ -475,7 +499,10 @@ class SRUtil: # Treat 401 (Unauthorized) as an auth failure: force a fresh re-login then retry is_401_error = ( - (isinstance(e, aiohttp.ClientResponseError) and getattr(e, "status", None) == 401) + ( + isinstance(e, aiohttp.ClientResponseError) + and getattr(e, "status", None) == 401 + ) or "401" in msg or "unauthorized" in msg.lower() ) @@ -491,7 +518,9 @@ class SRUtil: if login_success: logging.info("Forced re-login after 401 successful") else: - logging.warning("Forced re-login after 401 failed - may need device re-auth") + logging.warning( + "Forced re-login after 401 failed - may need device re-auth" + ) except Exception as login_exc: logging.warning("Forced login after 401 failed: %s", login_exc) if attempt < retries - 1: @@ -550,9 +579,7 @@ class SRUtil: title_match = self.is_fuzzy_match(expected_title, found_title, threshold) return artist_match and album_match and title_match - def dedupe_by_key( - self, key: str | list[str], entries: list[dict] - ) -> list[dict]: + def dedupe_by_key(self, key: str | list[str], entries: list[dict]) -> list[dict]: """Return entries de-duplicated by one or more keys.""" keys = [key] if isinstance(key, str) else list(key) @@ -679,9 +706,11 @@ class SRUtil: "upc": album_json.get("upc"), "album_copyright": album_json.get("copyright"), "album_cover_id": album_json.get("cover"), - "album_cover_url": f"https://resources.tidal.com/images/{album_json.get('cover')}/1280x1280.jpg" - if album_json.get("cover") - else None, + "album_cover_url": ( + f"https://resources.tidal.com/images/{album_json.get('cover')}/1280x1280.jpg" + if album_json.get("cover") + else None + ), } # Track-level (overrides or adds to album info) @@ -813,7 +842,9 @@ class SRUtil: return None if not metadata: return None - albums = self.dedupe_by_key(["title", "releaseDate"], metadata.get("albums", [])) + albums = self.dedupe_by_key( + ["title", "releaseDate"], metadata.get("albums", []) + ) albums_out = [ { "artist": ", ".join(artist["name"] for artist in album["artists"]), diff --git a/utils/test.conf b/utils/test.conf deleted file mode 100644 index f7479c3..0000000 --- a/utils/test.conf +++ /dev/null @@ -1,35 +0,0 @@ -# ----------------------- -# /m/m2/ PHP handler -location ~ ^/m/m2/(.+\.php)$ { - alias /storage/music2/completed/; - include fastcgi_params; - fastcgi_pass unix:/run/php/php8.2-fpm.sock; - fastcgi_param SCRIPT_FILENAME /storage/music2/completed/$1; - fastcgi_param DOCUMENT_ROOT /storage/music2/completed; - fastcgi_param SCRIPT_NAME /m/m2/$1; -} - -# /m/m2/ static files -location /m/m2/ { - alias /storage/music2/completed/; - index index.php; - try_files $uri $uri/ /index.php$is_args$args; -} - -# ----------------------- -# /m/ PHP handler -location ~ ^/m/(.+\.php)$ { - root /var/www/codey.lol/new/public; - include fastcgi_params; - fastcgi_pass unix:/run/php/php8.2-fpm.sock; - fastcgi_param SCRIPT_FILENAME $document_root/$1; - fastcgi_param DOCUMENT_ROOT $document_root; - fastcgi_param SCRIPT_NAME /m/$1; -} - -# /m/ static files -location /m/ { - root /var/www/codey.lol/new/public; - index index.php; - try_files $uri $uri/ /m/index.php$is_args$args; -} diff --git a/utils/yt_utils.py b/utils/yt_utils.py index a39cef4..1b42d13 100644 --- a/utils/yt_utils.py +++ b/utils/yt_utils.py @@ -7,19 +7,18 @@ import os VIDEO_PROXY_SECRET = os.environ.get("VIDEO_PROXY_SECRET", "").encode() -def sign_video_id(video_id: Optional[str|bool]) -> str: + +def sign_video_id(video_id: Optional[str | bool]) -> str: """Generate a signed token for a video ID.""" if not VIDEO_PROXY_SECRET or not video_id: return "" # Return empty if no secret configured - + timestamp = int(time.time() * 1000) # milliseconds to match JS Date.now() payload = f"{video_id}:{timestamp}" signature = hmac.new( - VIDEO_PROXY_SECRET, - payload.encode(), - hashlib.sha256 + VIDEO_PROXY_SECRET, payload.encode(), hashlib.sha256 ).hexdigest() - + token_data = f"{payload}:{signature}" # base64url encode (no padding, to match JS base64url) - return base64.urlsafe_b64encode(token_data.encode()).decode().rstrip("=") \ No newline at end of file + return base64.urlsafe_b64encode(token_data.encode()).decode().rstrip("=")