diff --git a/endpoints/radio.py b/endpoints/radio.py index 1ec87be..fbb3183 100644 --- a/endpoints/radio.py +++ b/endpoints/radio.py @@ -2,6 +2,9 @@ import logging import traceback import time import random +import json +import asyncio +from typing import Dict, Set from .constructors import ( ValidRadioNextRequest, ValidRadioReshuffleRequest, @@ -13,6 +16,8 @@ from .constructors import ( Station ) from utils import radio_util +from utils.sr_wrapper import SRUtil +from lyric_search.sources.lrclib import LRCLib from typing import Optional from fastapi import ( FastAPI, @@ -20,7 +25,9 @@ from fastapi import ( Request, Response, HTTPException, - Depends) + Depends, + WebSocket, + WebSocketDisconnect) from fastapi_throttle import RateLimiter from fastapi.responses import RedirectResponse, JSONResponse, FileResponse from auth.deps import get_current_user @@ -34,7 +41,12 @@ class Radio(FastAPI): self.constants = constants self.loop = loop self.radio_util = radio_util.RadioUtil(self.constants, self.loop) + self.sr_util = SRUtil() + self.lrclib = LRCLib() + self.lrc_cache: Dict[str, Optional[str]] = {} self.playlists_loaded: bool = False + # WebSocket connection management + self.active_connections: Dict[str, Set[WebSocket]] = {} self.endpoints: dict = { "radio/np": self.radio_now_playing, "radio/request": self.radio_request, @@ -58,6 +70,13 @@ class Radio(FastAPI): RateLimiter(times=25, seconds=2))] if not endpoint == "radio/np" else None, ) + # Add WebSocket route + async def websocket_route_handler(websocket: WebSocket): + station = websocket.path_params.get("station", "main") + await self.websocket_endpoint_handler(websocket, station) + + app.add_websocket_route("/radio/ws/{station}", websocket_route_handler) + app.add_event_handler("startup", self.on_start) async def on_start(self) -> None: @@ -406,6 +425,8 @@ class Radio(FastAPI): next["end"] = time_ends try: background_tasks.add_task(self.radio_util.webhook_song_change, next, data.station) + # Broadcast track change to WebSocket clients + background_tasks.add_task(self.broadcast_track_change, data.station, next.copy()) except Exception as e: logging.info("radio_get_next Exception: %s", str(e)) traceback.print_exc() @@ -464,7 +485,7 @@ class Radio(FastAPI): return JSONResponse(content={"result": search}) def radio_typeahead( - self, data: ValidRadioTypeaheadRequest, request: Request, user=Depends(get_current_user) + self, data: ValidRadioTypeaheadRequest, request: Request ) -> JSONResponse: """ Handle typeahead queries for the radio. @@ -472,13 +493,13 @@ class Radio(FastAPI): Parameters: - **data** (ValidRadioTypeaheadRequest): Contains the typeahead query. - **request** (Request): The HTTP request object. - - **user**: Current authenticated user. + # - **user**: Current authenticated user. Returns: - **JSONResponse**: Contains the typeahead results. """ - if "dj" not in user.get("roles", []): - raise HTTPException(status_code=403, detail="Insufficient permissions") + # if "dj" not in user.get("roles", []): + # raise HTTPException(status_code=403, detail="Insufficient permissions") if not isinstance(data.query, str): return JSONResponse( @@ -492,3 +513,186 @@ class Radio(FastAPI): if not typeahead: return JSONResponse(content=[]) return JSONResponse(content=typeahead) + + async def websocket_endpoint_handler(self, websocket: WebSocket, station: str): + """ + WebSocket endpoint for real-time radio updates. + + Clients can connect to /radio/ws/{station} to receive: + - Current track info on connect + - Real-time updates when tracks change + + Parameters: + - **websocket** (WebSocket): The WebSocket connection + - **station** (str): The radio station name + """ + await websocket.accept() + + # Initialize connections dict for this station if not exists + if station not in self.active_connections: + self.active_connections[station] = set() + + # Add this connection to the station's connection set + self.active_connections[station].add(websocket) + + try: + # Send current track info immediately on connect + current_track = await self._get_now_playing_data(station) + await websocket.send_text(json.dumps(current_track)) + + # Send LRC asynchronously + asyncio.create_task(self._send_lrc_to_client(websocket, station, current_track)) + + # Keep connection alive and handle incoming messages + while True: + try: + # Wait for messages (optional - could be used for client commands) + data = await websocket.receive_text() + # For now, just echo back a confirmation + await websocket.send_text(json.dumps({"type": "ack", "data": data})) + except WebSocketDisconnect: + break + + except WebSocketDisconnect: + pass + finally: + # Remove connection when client disconnects + if station in self.active_connections: + self.active_connections[station].discard(websocket) + # Clean up empty station sets + if not self.active_connections[station]: + del self.active_connections[station] + + async def _get_now_playing_data(self, station: str) -> dict: + """ + Get now playing data for a specific station. + + Parameters: + - **station** (str): Station name + + Returns: + - **dict**: Current track information + """ + ret_obj: dict = {**self.radio_util.now_playing.get(station, {})} + ret_obj["station"] = station + try: + if "start" in ret_obj: + ret_obj["elapsed"] = int(time.time()) - ret_obj["start"] + else: + ret_obj["elapsed"] = 0 + except KeyError: + ret_obj["elapsed"] = 0 + + # Remove sensitive file path info + ret_obj.pop("file_path", None) + return ret_obj + + async def broadcast_track_change(self, station: str, track_data: dict): + """ + Broadcast track change to all connected WebSocket clients for a station. + + Parameters: + - **station** (str): Station name + - **track_data** (dict): New track information + """ + if station not in self.active_connections: + return + + # Create broadcast message + broadcast_data = { + "type": "track_change", + "data": track_data + } + + # Send to all connected clients for this station + disconnected_clients = set() + for websocket in self.active_connections[station]: + try: + await websocket.send_text(json.dumps(broadcast_data)) + except Exception as e: + logging.warning(f"Failed to send WebSocket message: {e}") + disconnected_clients.add(websocket) + + # Remove failed connections + for websocket in disconnected_clients: + self.active_connections[station].discard(websocket) + + # Broadcast LRC asynchronously + asyncio.create_task(self._broadcast_lrc(station, track_data)) + + async def _send_lrc_to_client(self, websocket: WebSocket, station: str, track_data: dict): + """Send LRC data to a specific client asynchronously.""" + logging.info(f"Sending LRC to client for station {station}") + logging.info(f"Track data: {track_data}") + try: + artist = track_data.get("artist") + title = track_data.get("song") # Changed from "title" to "song" + duration = track_data.get("duration") + + if artist and title: + logging.info(f"Fetching LRC for {artist} - {title} (duration: {duration})") + lrc = await self.sr_util.get_lrc_by_artist_song( + artist, title, duration=duration + ) + if not lrc: + logging.info(f"No LRC from SR, trying LRCLib for {artist} - {title}") + lrclib_result = await self.lrclib.search(artist, title, plain=False) + if lrclib_result and lrclib_result.lyrics and isinstance(lrclib_result.lyrics, str): + lrc = lrclib_result.lyrics + logging.info("LRC found via LRCLib fallback") + self.lrc_cache[station] = lrc + logging.info(f"LRC fetched: {lrc is not None}") + if lrc: + lrc_data = { + "type": "lrc", + "data": lrc + } + await websocket.send_text(json.dumps(lrc_data)) + logging.info("LRC sent to client") + except Exception as e: + logging.error(f"Failed to send LRC to client: {e}") + + async def _broadcast_lrc(self, station: str, track_data: dict): + """Broadcast LRC data to all connected clients for a station asynchronously.""" + if station not in self.active_connections: + return + + try: + artist = track_data.get("artist") + title = track_data.get("song") # Changed from "title" to "song" + duration = track_data.get("duration") + + if artist and title: + logging.info(f"Broadcasting LRC fetch for {artist} - {title} (duration: {duration})") + lrc = await self.sr_util.get_lrc_by_artist_song( + artist, title, duration=duration + ) + if not lrc: + logging.info(f"No LRC from SR, trying LRCLib for {artist} - {title}") + lrclib_result = await self.lrclib.search(artist, title, plain=False) + if lrclib_result and lrclib_result.lyrics and isinstance(lrclib_result.lyrics, str): + lrc = lrclib_result.lyrics + logging.info("LRC found via LRCLib fallback") + self.lrc_cache[station] = lrc + logging.info(f"LRC fetched for broadcast: {lrc is not None}") + if lrc: + lrc_data = { + "type": "lrc", + "data": lrc + } + + # Send to all connected clients + disconnected_clients = set() + for websocket in self.active_connections[station]: + try: + await websocket.send_text(json.dumps(lrc_data)) + except Exception as e: + logging.warning(f"Failed to send LRC to client: {e}") + disconnected_clients.add(websocket) + + # Remove failed connections + for websocket in disconnected_clients: + self.active_connections[station].discard(websocket) + logging.info("LRC broadcasted to clients") + except Exception as e: + logging.error(f"Failed to broadcast LRC: {e}") diff --git a/utils/radio_util.py b/utils/radio_util.py index 9ae11e9..336d7fd 100644 --- a/utils/radio_util.py +++ b/utils/radio_util.py @@ -4,6 +4,7 @@ import time import datetime import os import random +import asyncio from uuid import uuid4 as uuid from typing import Union, Optional, Iterable from aiohttp import ClientSession, ClientTimeout @@ -478,10 +479,11 @@ class RadioUtil: playlist, len(self.active_playlist[playlist]), ) - """Loading Complete""" - logging.info(f"Skipping: {playlist}") - await self._ls_skip(playlist) # Request skip from LS to bring streams current - + """Loading Complete""" + # Request skip from LS to bring streams current + for playlist in self.playlists: + logging.info("Skipping: %s", playlist) + await self._ls_skip(playlist) self.playlists_loaded = True except Exception as e: logging.info("Playlist load failed: %s", str(e)) diff --git a/utils/sr_wrapper.py b/utils/sr_wrapper.py index f0d3791..368781c 100644 --- a/utils/sr_wrapper.py +++ b/utils/sr_wrapper.py @@ -22,12 +22,12 @@ class MetadataFetchError(Exception): # Suppress all logging output from this module and its children for name in [__name__, "utils.sr_wrapper"]: logger = logging.getLogger(name) - logger.setLevel(logging.CRITICAL) + logger.setLevel(logging.INFO) # Temporarily set to INFO for debugging LRC logger.propagate = False for handler in logger.handlers: - handler.setLevel(logging.CRITICAL) + handler.setLevel(logging.INFO) # Also set the root logger to CRITICAL as a last resort (may affect global logging) -logging.getLogger().setLevel(logging.CRITICAL) +# logging.getLogger().setLevel(logging.CRITICAL) load_dotenv() @@ -746,3 +746,55 @@ class SRUtil: except Exception as e: logging.critical("Error: %s", str(e)) return False + + async def get_lrc_by_track_id(self, track_id: int) -> Optional[str]: + """Get LRC lyrics by track ID.""" + logging.info(f"SR: Fetching metadata for track ID {track_id}") + metadata = await self.get_metadata_by_track_id(track_id) + lrc = metadata.get('lyrics') if metadata else None + logging.info(f"SR: LRC {'found' if lrc else 'not found'}") + return lrc + + + + async def get_lrc_by_artist_song( + self, artist: str, song: str, album: Optional[str] = None, duration: Optional[int] = None + ) -> Optional[str]: + """Get LRC lyrics by artist and song, optionally filtering by album and duration.""" + logging.info(f"SR: Searching tracks for {artist} - {song}") + tracks = await self.get_tracks_by_artist_song(artist, song) + logging.info(f"SR: Found {len(tracks) if tracks else 0} tracks") + if not tracks: + return None + + # Filter by album if provided + if album: + tracks = [ + t for t in tracks + if t.get('album', {}).get('title', '').lower() == album.lower() + ] + + if not tracks: + return None + + # If duration provided, select the track with closest duration match + if duration is not None: + tracks_with_diff = [ + (t, abs(t.get('duration', 0) - duration)) for t in tracks + ] + tracks_with_diff.sort(key=lambda x: x[1]) + best_track, min_diff = tracks_with_diff[0] + logging.info(f"SR: Best match duration diff: {min_diff}s") + # If the closest match is more than 5 seconds off, consider no match + if min_diff > 5: + logging.info("SR: Duration diff too large, no match") + return None + else: + best_track = tracks[0] + + track_id = best_track.get('id') + logging.info(f"SR: Using track ID {track_id}") + if not track_id: + return None + + return await self.get_lrc_by_track_id(track_id)