import logging import traceback import time import random import json import asyncio from typing import Dict, Set from .constructors import ( ValidRadioNextRequest, ValidRadioReshuffleRequest, ValidRadioQueueShiftRequest, ValidRadioQueueRemovalRequest, ValidRadioSongRequest, ValidRadioTypeaheadRequest, ValidRadioQueueRequest, Station ) from utils import radio_util from utils.sr_wrapper import SRUtil from lyric_search.sources.lrclib import LRCLib from typing import Optional from fastapi import ( FastAPI, BackgroundTasks, Request, Response, HTTPException, Depends, WebSocket, WebSocketDisconnect) from fastapi_throttle import RateLimiter from fastapi.responses import RedirectResponse, JSONResponse, FileResponse from auth.deps import get_current_user class Radio(FastAPI): """Radio Endpoints""" def __init__(self, app: FastAPI, my_util, constants, loop) -> None: self.app: FastAPI = app self.util = my_util self.constants = constants self.loop = loop self.radio_util = radio_util.RadioUtil(self.constants, self.loop) self.sr_util = SRUtil() self.lrclib = LRCLib() self.lrc_cache: Dict[str, Optional[str]] = {} self.playlists_loaded: bool = False # WebSocket connection management self.active_connections: Dict[str, Set[WebSocket]] = {} self.endpoints: dict = { "radio/np": self.radio_now_playing, "radio/request": self.radio_request, "radio/typeahead": self.radio_typeahead, "radio/get_queue": self.radio_get_queue, "radio/skip": self.radio_skip, "radio/queue_shift": self.radio_queue_shift, "radio/reshuffle": self.radio_reshuffle, "radio/queue_remove": self.radio_queue_remove, "radio/ls._next_": self.radio_get_next, "radio/album_art": self.album_art_handler, } for endpoint, handler in self.endpoints.items(): methods: list[str] = ["POST"] if endpoint == "radio/album_art": methods = ["GET"] app.add_api_route( f"/{endpoint}", handler, methods=methods, include_in_schema=True, dependencies=[Depends( RateLimiter(times=25, seconds=2))] if not endpoint == "radio/np" else None, ) # Add WebSocket route async def websocket_route_handler(websocket: WebSocket): station = websocket.path_params.get("station", "main") await self.websocket_endpoint_handler(websocket, station) app.add_websocket_route("/radio/ws/{station}", websocket_route_handler) app.add_event_handler("startup", self.on_start) async def on_start(self) -> None: stations = ", ".join(self.radio_util.db_queries.keys()) logging.info("radio: Initializing stations:\n%s", stations) await self.radio_util.load_playlists() async def radio_skip( self, data: ValidRadioNextRequest, request: Request, user=Depends(get_current_user) ) -> JSONResponse: """ Skip to the next track in the queue, or to the UUID specified in `skipTo` if provided. Parameters: - **data** (ValidRadioNextRequest): Contains optional UUID to skip to, and station name. - **request** (Request): The HTTP request object. - **user**: Current authenticated user. Returns: - **JSONResponse**: Indicates success or failure of the skip operation. """ if "dj" not in user.get("roles", []): raise HTTPException(status_code=403, detail="Insufficient permissions") try: if data.skipTo: queue_item = self.radio_util.get_queue_item_by_uuid(data.skipTo, data.station) if not queue_item: return JSONResponse( status_code=500, content={ "err": True, "errorText": "No such queue item.", }, ) self.radio_util.active_playlist[data.station] = self.radio_util.active_playlist[data.station][ queue_item[0] : ] skip_result: bool = await self.radio_util._ls_skip(data.station) status_code = 200 if skip_result else 500 return JSONResponse( status_code=status_code, content={ "success": skip_result, }, ) except Exception as e: logging.debug("radio_skip Exception: %s", str(e)) traceback.print_exc() if not isinstance(e, HTTPException): return JSONResponse( status_code=500, content={ "err": True, "errorText": "General failure.", }, ) raise e # Re-raise HTTPException async def radio_reshuffle( self, data: ValidRadioReshuffleRequest, request: Request, user=Depends(get_current_user) ) -> JSONResponse: """ Reshuffle the play queue. Parameters: - **data** (ValidRadioReshuffleRequest): Contains the station name. - **request** (Request): The HTTP request object. - **user**: Current authenticated user. Returns: - **JSONResponse**: Indicates success of the reshuffle operation. """ if "dj" not in user.get("roles", []): raise HTTPException(status_code=403, detail="Insufficient permissions") random.shuffle(self.radio_util.active_playlist[data.station]) return JSONResponse(content={"ok": True}) async def radio_get_queue( self, request: Request, data: Optional[ValidRadioQueueRequest] = None, ) -> JSONResponse: """ Get the current play queue (paged, 20 results per page). Parameters: - **request** (Request): The HTTP request object. - **data** (Optional[ValidRadioQueueRequest]): Contains the station name and optional search query. Returns: - **JSONResponse**: Contains the paged queue data. """ if not (data and data.station): return JSONResponse(status_code=500, content={ "err": True, "errorText": "Invalid request.", }) search: Optional[str] = None draw: int = 0 if isinstance(data, ValidRadioQueueRequest): search = data.search draw = data.draw or 0 start: int = int(data.start or 0) end: int = start + 20 else: start: int = 0 end: int = 20 orig_queue: list[dict] = self.radio_util.active_playlist[data.station] if not search: queue_full: Optional[list] = orig_queue else: queue_full = self.radio_util.datatables_search(search, data.station) if not queue_full: return JSONResponse( status_code=500, content={ "err": True, "errorText": "No queue found.", } ) queue: list = queue_full[start:end] queue_out: list[dict] = [] for x, item in enumerate(queue): queue_out.append( { "pos": orig_queue.index(item), "id": item.get("id"), "uuid": item.get("uuid"), "artist": item.get("artist"), "song": item.get("song"), "album": item.get("album", "N/A"), "genre": item.get("genre", "N/A"), "artistsong": item.get("artistsong"), "duration": item.get("duration"), } ) full_playlist_len: int = len(orig_queue) filtered_len: int = len(queue_full) out_json = { "draw": draw, "recordsTotal": full_playlist_len, "recordsFiltered": filtered_len, "items": queue_out, } return JSONResponse(content=out_json) async def radio_queue_shift( self, data: ValidRadioQueueShiftRequest, request: Request, user=Depends(get_current_user) ) -> JSONResponse: """ Shift the position of a UUID within the queue. Parameters: - **data** (ValidRadioQueueShiftRequest): Contains the UUID to shift, and station name. - **request** (Request): The HTTP request object. - **user**: Current authenticated user. Returns: - **JSONResponse**: Indicates success of the shift operation. """ if "dj" not in user.get("roles", []): raise HTTPException(status_code=403, detail="Insufficient permissions") queue_item = self.radio_util.get_queue_item_by_uuid(data.uuid, data.station) if not queue_item: return JSONResponse( status_code=500, content={ "err": True, "errorText": "Queue item not found.", }, ) (x, item) = queue_item self.radio_util.active_playlist[data.station].pop(x) self.radio_util.active_playlist[data.station].insert(0, item) if not data.next: await self.radio_util._ls_skip(data.station) return JSONResponse( content={ "ok": True, } ) async def radio_queue_remove( self, data: ValidRadioQueueRemovalRequest, request: Request, user=Depends(get_current_user) ) -> JSONResponse: """ Remove an item from the current play queue. Parameters: - **data** (ValidRadioQueueRemovalRequest): Contains the UUID of the item to remove, and station name. - **request** (Request): The HTTP request object. - **user**: Current authenticated user. Returns: - **JSONResponse**: Indicates success of the removal operation. """ if "dj" not in user.get("roles", []): raise HTTPException(status_code=403, detail="Insufficient permissions") queue_item = self.radio_util.get_queue_item_by_uuid(data.uuid, data.station) if not queue_item: return JSONResponse( status_code=500, content={ "err": True, "errorText": "Queue item not found.", }, ) self.radio_util.active_playlist[data.station].pop(queue_item[0]) return JSONResponse( content={ "ok": True, } ) async def album_art_handler( self, request: Request, track_id: Optional[int] = None, station: Station = "main" ) -> Response: """ Get album art for the current or specified track. Parameters: - **request** (Request): The HTTP request object. - **track_id** (Optional[int]): ID of the track to retrieve album art for. Defaults to the current track. - **station** (Station): Name of the station. Defaults to "main". Returns: - **Response**: Contains the album art image or a default image. """ try: if not track_id: track_id = self.radio_util.now_playing[station].get("id") if not track_id: # Still no track ID return JSONResponse(status_code=500, content={ "err": True, "errorText": "Invalid request", }) logging.debug("Seeking album art with trackId: %s", track_id) album_art: Optional[bytes] = self.radio_util.get_album_art( track_id=track_id ) if not album_art: return FileResponse( path="/var/www/codey.lol/new/public/images/radio_art_default.jpg", ) return Response(content=album_art, media_type="image/png") except Exception as e: logging.debug("album_art_handler Exception: %s", str(e)) traceback.print_exc() return RedirectResponse( url="https://codey.lol/images/radio_art_default.jpg", status_code=302 ) async def radio_now_playing(self, request: Request, station: Station = "main") -> JSONResponse: """ Get information about the currently playing track. Parameters: - **request** (Request): The HTTP request object. - **station** (Station): Name of the station. Defaults to "main". Returns: - **JSONResponse**: Contains the track information. """ ret_obj: dict = {**self.radio_util.now_playing[station]} ret_obj["station"] = station try: ret_obj["elapsed"] = int(time.time()) - ret_obj["start"] except KeyError: traceback.print_exc() ret_obj["elapsed"] = 0 ret_obj.pop("file_path") return JSONResponse(content=ret_obj) async def radio_get_next( self, data: ValidRadioNextRequest, request: Request, background_tasks: BackgroundTasks, user=Depends(get_current_user), ) -> JSONResponse: """ Get the next track in the queue. The track will be removed from the queue in the process. Parameters: - **data** (ValidRadioNextRequest): Contains optional UUID to skip to, and station name. - **request** (Request): The HTTP request object. - **background_tasks** (BackgroundTasks): Background tasks for webhook execution. - **user**: Current authenticated user. Returns: - **JSONResponse**: Contains the next track information. """ if "dj" not in user.get("roles", []): raise HTTPException(status_code=403, detail="Insufficient permissions") logging.info("Radio get next") if data.station not in self.radio_util.active_playlist.keys(): raise HTTPException(status_code=500, detail="No such station/not ready") if ( not isinstance(self.radio_util.active_playlist[data.station], list) or not self.radio_util.active_playlist[data.station] ): if self.radio_util.playlists_loaded: self.radio_util.playlists_loaded = False await self.on_start() return JSONResponse( status_code=500, content={ "err": True, "errorText": "General failure occurred, prompting playlist reload.", }, ) next = self.radio_util.active_playlist[data.station].pop(0) if not isinstance(next, dict): logging.critical("next is of type: %s, reloading playlist...", type(next)) await self.on_start() return JSONResponse( status_code=500, content={ "err": True, "errorText": "General failure occurred, prompting playlist reload.", }, ) duration: int = next["duration"] time_started: int = int(time.time()) time_ends: int = int(time_started + duration) self.radio_util.active_playlist[data.station].append(next) # Push to end of playlist self.radio_util.now_playing[data.station] = next next["start"] = time_started next["end"] = time_ends # Clear the LRC cache for the station self.lrc_cache.pop(data.station, None) try: background_tasks.add_task(self.radio_util.webhook_song_change, next, data.station) # Broadcast track change to WebSocket clients background_tasks.add_task(self.broadcast_track_change, data.station, next.copy()) except Exception as e: logging.info("radio_get_next Exception: %s", str(e)) traceback.print_exc() try: album_art = self.radio_util.get_album_art(track_id=next["id"]) if not album_art: self.radio_util.cache_album_art(next["id"], next["file_path"]) except Exception as e: logging.info("radio_get_next Exception: %s", str(e)) traceback.print_exc() return JSONResponse(content=next) async def radio_request( self, data: ValidRadioSongRequest, request: Request, user=Depends(get_current_user) ) -> JSONResponse: """ Handle song requests. Parameters: - **data** (ValidRadioSongRequest): Contains artist, song, and station name. - **request** (Request): The HTTP request object. - **user**: Current authenticated user. Returns: - **JSONResponse**: Indicates success or failure of the request. """ if "dj" not in user.get("roles", []): raise HTTPException(status_code=403, detail="Insufficient permissions") artistsong: Optional[str] = data.artistsong artist: Optional[str] = data.artist song: Optional[str] = data.song if artistsong and (artist or song): return JSONResponse( status_code=500, content={ "err": True, "errorText": "Invalid request", }, ) if not artistsong and (not artist or not song): return JSONResponse( status_code=500, content={ "err": True, "errorText": "Invalid request", }, ) search: bool = self.radio_util.search_db( artistsong=artistsong, artist=artist, song=song, station=data.station ) if data.alsoSkip: await self.radio_util._ls_skip(data.station) return JSONResponse(content={"result": search}) def radio_typeahead( self, data: ValidRadioTypeaheadRequest, request: Request ) -> JSONResponse: """ Handle typeahead queries for the radio. Parameters: - **data** (ValidRadioTypeaheadRequest): Contains the typeahead query. - **request** (Request): The HTTP request object. # - **user**: Current authenticated user. Returns: - **JSONResponse**: Contains the typeahead results. """ # if "dj" not in user.get("roles", []): # raise HTTPException(status_code=403, detail="Insufficient permissions") if not isinstance(data.query, str): return JSONResponse( status_code=500, content={ "err": True, "errorText": "Invalid request.", }, ) typeahead: Optional[list[str]] = self.radio_util.trackdb_typeahead(data.query) if not typeahead: return JSONResponse(content=[]) return JSONResponse(content=typeahead) async def websocket_endpoint_handler(self, websocket: WebSocket, station: str): """ WebSocket endpoint for real-time radio updates. Clients can connect to /radio/ws/{station} to receive: - Current track info on connect - Real-time updates when tracks change Parameters: - **websocket** (WebSocket): The WebSocket connection - **station** (str): The radio station name """ await websocket.accept() # Initialize connections dict for this station if not exists if station not in self.active_connections: self.active_connections[station] = set() # Add this connection to the station's connection set self.active_connections[station].add(websocket) try: # Send current track info immediately on connect current_track = await self._get_now_playing_data(station) current_track.pop("file_path", None) # Ensure file_path is stripped await websocket.send_text(json.dumps(current_track)) # Send LRC asynchronously asyncio.create_task(self._send_lrc_to_client(websocket, station, current_track)) # Keep connection alive and handle incoming messages while True: try: # Wait for messages (optional - could be used for client commands) data = await websocket.receive_text() # For now, just echo back a confirmation await websocket.send_text(json.dumps({"type": "ack", "data": data})) except WebSocketDisconnect: break except WebSocketDisconnect: pass finally: # Remove connection when client disconnects if station in self.active_connections: self.active_connections[station].discard(websocket) # Clean up empty station sets if not self.active_connections[station]: del self.active_connections[station] async def _get_now_playing_data(self, station: str) -> dict: """ Get now playing data for a specific station. Parameters: - **station** (str): Station name Returns: - **dict**: Current track information """ ret_obj: dict = {**self.radio_util.now_playing.get(station, {})} ret_obj["station"] = station try: if "start" in ret_obj: ret_obj["elapsed"] = int(time.time()) - ret_obj["start"] else: ret_obj["elapsed"] = 0 except KeyError: ret_obj["elapsed"] = 0 # Remove sensitive file path info ret_obj.pop("file_path", None) return ret_obj async def broadcast_track_change(self, station: str, track_data: dict): """ Broadcast track change to all connected WebSocket clients for a station. Parameters: - **station** (str): Station name - **track_data** (dict): New track information """ if station not in self.active_connections: return # Remove sensitive file path info track_data.pop("file_path", None) # Create broadcast message broadcast_data = { "type": "track_change", "data": track_data } # Send to all connected clients for this station disconnected_clients = set() for websocket in self.active_connections[station]: try: await websocket.send_text(json.dumps(broadcast_data)) except Exception as e: logging.warning(f"Failed to send WebSocket message: {e}") disconnected_clients.add(websocket) # Remove failed connections for websocket in disconnected_clients: self.active_connections[station].discard(websocket) # Broadcast LRC asynchronously asyncio.create_task(self._broadcast_lrc(station, track_data)) async def _send_lrc_to_client(self, websocket: WebSocket, station: str, track_data: dict): """Send LRC data to a specific client asynchronously.""" logging.info(f"Sending LRC to client for station {station}") logging.info(f"Track data: {track_data}") try: # Always check if LRC is already cached cached_lrc = self.lrc_cache.get(station) if cached_lrc: logging.info("Using cached LRC for client") lrc_data: dict = { "type": "lrc", "data": cached_lrc, "source": "Cache" } await websocket.send_text(json.dumps(lrc_data)) return # Fetch LRC if not cached artist: Optional[str] = track_data.get("artist") title: Optional[str] = track_data.get("song") duration: Optional[int] = track_data.get("duration") if artist and title: logging.info(f"Fetching LRC for {artist} - {title} (duration: {duration})") lrc: Optional[str] = await self.sr_util.get_lrc_by_artist_song( artist, title, duration=duration ) source: str = "SR" if not lrc: logging.info(f"No LRC from SR, trying LRCLib for {artist} - {title}") lrclib_result = await self.lrclib.search(artist, title, plain=False) if lrclib_result and lrclib_result.lyrics and isinstance(lrclib_result.lyrics, str): lrc = lrclib_result.lyrics source = "LRCLib" logging.info("LRC found via LRCLib fallback") if lrc: self.lrc_cache[station] = lrc # Cache the LRC regardless of source lrc_data: dict = { "type": "lrc", "data": lrc, "source": source } await websocket.send_text(json.dumps(lrc_data)) logging.info("LRC sent to client") else: logging.info("No LRC found from any source.") except Exception as e: logging.error(f"Failed to send LRC to client: {e}") async def _broadcast_lrc(self, station: str, track_data: dict): """Broadcast LRC data to all connected clients for a station asynchronously.""" if station not in self.active_connections: return try: # Clear the LRC cache for the station on track change self.lrc_cache.pop(station, None) # Fetch LRC if not cached artist: Optional[str] = track_data.get("artist") title: Optional[str] = track_data.get("song") # Changed from "title" to "song" duration: Optional[int] = track_data.get("duration") if artist and title: logging.info(f"Broadcasting LRC fetch for {artist} - {title} (duration: {duration})") lrc: Optional[str] = await self.sr_util.get_lrc_by_artist_song( artist, title, duration=duration ) source: str = "SR" if not lrc: logging.info(f"No LRC from SR, trying LRCLib for {artist} - {title}") lrclib_result = await self.lrclib.search(artist, title, plain=False) if lrclib_result and lrclib_result.lyrics and isinstance(lrclib_result.lyrics, str): lrc = lrclib_result.lyrics source = "LRCLib" logging.info("LRC found via LRCLib fallback") self.lrc_cache[station] = lrc # Cache the LRC logging.info(f"LRC fetched for broadcast: {lrc is not None}") if lrc: lrc_data: dict = { "type": "lrc", "data": lrc, "source": source } # Send to all connected clients disconnected_clients = set() for websocket in self.active_connections[station]: try: await websocket.send_text(json.dumps(lrc_data)) except Exception as e: logging.warning(f"Failed to send LRC to client: {e}") disconnected_clients.add(websocket) for websocket in disconnected_clients: self.active_connections[station].discard(websocket) logging.info("LRC broadcasted to clients") except Exception as e: logging.error(f"Failed to broadcast LRC: {e}")