import logging import traceback import time import random from .constructors import ( ValidRadioNextRequest, ValidRadioReshuffleRequest, ValidRadioQueueShiftRequest, ValidRadioQueueRemovalRequest, ValidRadioSongRequest, ValidRadioTypeaheadRequest, ValidRadioQueueRequest, Station ) from utils import radio_util from typing import Optional from fastapi import ( FastAPI, BackgroundTasks, Request, Response, HTTPException, Depends) from fastapi_throttle import RateLimiter from fastapi.responses import RedirectResponse, JSONResponse, FileResponse from auth.deps import get_current_user class Radio(FastAPI): """Radio Endpoints""" def __init__(self, app: FastAPI, my_util, constants, loop) -> None: self.app: FastAPI = app self.util = my_util self.constants = constants self.loop = loop self.radio_util = radio_util.RadioUtil(self.constants, self.loop) self.playlists_loaded: bool = False self.endpoints: dict = { "radio/np": self.radio_now_playing, "radio/request": self.radio_request, "radio/typeahead": self.radio_typeahead, "radio/get_queue": self.radio_get_queue, "radio/skip": self.radio_skip, "radio/queue_shift": self.radio_queue_shift, "radio/reshuffle": self.radio_reshuffle, "radio/queue_remove": self.radio_queue_remove, "radio/ls._next_": self.radio_get_next, "radio/album_art": self.album_art_handler, } for endpoint, handler in self.endpoints.items(): methods: list[str] = ["POST"] if endpoint == "radio/album_art": methods = ["GET"] app.add_api_route( f"/{endpoint}", handler, methods=methods, include_in_schema=True, dependencies=[Depends( RateLimiter(times=25, seconds=2))] if not endpoint == "radio/np" else None, ) app.add_event_handler("startup", self.on_start) async def on_start(self) -> None: stations = ", ".join(self.radio_util.db_queries.keys()) logging.info("radio: Initializing stations:\n%s", stations) await self.radio_util.load_playlists() async def radio_skip( self, data: ValidRadioNextRequest, request: Request, user=Depends(get_current_user) ) -> JSONResponse: """ Skip to the next track in the queue, or to the UUID specified in `skipTo` if provided. Parameters: - **data** (ValidRadioNextRequest): Contains optional UUID to skip to, and station name. - **request** (Request): The HTTP request object. - **user**: Current authenticated user. Returns: - **JSONResponse**: Indicates success or failure of the skip operation. """ if "dj" not in user.get("roles", []): raise HTTPException(status_code=403, detail="Insufficient permissions") try: if data.skipTo: queue_item = self.radio_util.get_queue_item_by_uuid(data.skipTo, data.station) if not queue_item: return JSONResponse( status_code=500, content={ "err": True, "errorText": "No such queue item.", }, ) self.radio_util.active_playlist[data.station] = self.radio_util.active_playlist[data.station][ queue_item[0] : ] skip_result: bool = await self.radio_util._ls_skip(data.station) status_code = 200 if skip_result else 500 return JSONResponse( status_code=status_code, content={ "success": skip_result, }, ) except Exception as e: logging.debug("radio_skip Exception: %s", str(e)) traceback.print_exc() if not isinstance(e, HTTPException): return JSONResponse( status_code=500, content={ "err": True, "errorText": "General failure.", }, ) raise e # Re-raise HTTPException async def radio_reshuffle( self, data: ValidRadioReshuffleRequest, request: Request, user=Depends(get_current_user) ) -> JSONResponse: """ Reshuffle the play queue. Parameters: - **data** (ValidRadioReshuffleRequest): Contains the station name. - **request** (Request): The HTTP request object. - **user**: Current authenticated user. Returns: - **JSONResponse**: Indicates success of the reshuffle operation. """ if "dj" not in user.get("roles", []): raise HTTPException(status_code=403, detail="Insufficient permissions") random.shuffle(self.radio_util.active_playlist[data.station]) return JSONResponse(content={"ok": True}) async def radio_get_queue( self, request: Request, data: Optional[ValidRadioQueueRequest] = None, ) -> JSONResponse: """ Get the current play queue (paged, 20 results per page). Parameters: - **request** (Request): The HTTP request object. - **data** (Optional[ValidRadioQueueRequest]): Contains the station name and optional search query. Returns: - **JSONResponse**: Contains the paged queue data. """ if not (data and data.station): return JSONResponse(status_code=500, content={ "err": True, "errorText": "Invalid request.", }) search: Optional[str] = None draw: int = 0 if isinstance(data, ValidRadioQueueRequest): search = data.search draw = data.draw or 0 start: int = int(data.start or 0) end: int = start + 20 else: start: int = 0 end: int = 20 orig_queue: list[dict] = self.radio_util.active_playlist[data.station] if not search: queue_full: Optional[list] = orig_queue else: queue_full = self.radio_util.datatables_search(search, data.station) if not queue_full: return JSONResponse( status_code=500, content={ "err": True, "errorText": "No queue found.", } ) queue: list = queue_full[start:end] queue_out: list[dict] = [] for x, item in enumerate(queue): queue_out.append( { "pos": orig_queue.index(item), "id": item.get("id"), "uuid": item.get("uuid"), "artist": item.get("artist"), "song": item.get("song"), "album": item.get("album", "N/A"), "genre": item.get("genre", "N/A"), "artistsong": item.get("artistsong"), "duration": item.get("duration"), } ) full_playlist_len: int = len(orig_queue) filtered_len: int = len(queue_full) out_json = { "draw": draw, "recordsTotal": full_playlist_len, "recordsFiltered": filtered_len, "items": queue_out, } return JSONResponse(content=out_json) async def radio_queue_shift( self, data: ValidRadioQueueShiftRequest, request: Request, user=Depends(get_current_user) ) -> JSONResponse: """ Shift the position of a UUID within the queue. Parameters: - **data** (ValidRadioQueueShiftRequest): Contains the UUID to shift, and station name. - **request** (Request): The HTTP request object. - **user**: Current authenticated user. Returns: - **JSONResponse**: Indicates success of the shift operation. """ if "dj" not in user.get("roles", []): raise HTTPException(status_code=403, detail="Insufficient permissions") queue_item = self.radio_util.get_queue_item_by_uuid(data.uuid, data.station) if not queue_item: return JSONResponse( status_code=500, content={ "err": True, "errorText": "Queue item not found.", }, ) (x, item) = queue_item self.radio_util.active_playlist[data.station].pop(x) self.radio_util.active_playlist[data.station].insert(0, item) if not data.next: await self.radio_util._ls_skip(data.station) return JSONResponse( content={ "ok": True, } ) async def radio_queue_remove( self, data: ValidRadioQueueRemovalRequest, request: Request, user=Depends(get_current_user) ) -> JSONResponse: """ Remove an item from the current play queue. Parameters: - **data** (ValidRadioQueueRemovalRequest): Contains the UUID of the item to remove, and station name. - **request** (Request): The HTTP request object. - **user**: Current authenticated user. Returns: - **JSONResponse**: Indicates success of the removal operation. """ if "dj" not in user.get("roles", []): raise HTTPException(status_code=403, detail="Insufficient permissions") queue_item = self.radio_util.get_queue_item_by_uuid(data.uuid, data.station) if not queue_item: return JSONResponse( status_code=500, content={ "err": True, "errorText": "Queue item not found.", }, ) self.radio_util.active_playlist[data.station].pop(queue_item[0]) return JSONResponse( content={ "ok": True, } ) async def album_art_handler( self, request: Request, track_id: Optional[int] = None, station: Station = "main" ) -> Response: """ Get album art for the current or specified track. Parameters: - **request** (Request): The HTTP request object. - **track_id** (Optional[int]): ID of the track to retrieve album art for. Defaults to the current track. - **station** (Station): Name of the station. Defaults to "main". Returns: - **Response**: Contains the album art image or a default image. """ try: if not track_id: track_id = self.radio_util.now_playing[station].get("id") if not track_id: # Still no track ID return JSONResponse(status_code=500, content={ "err": True, "errorText": "Invalid request", }) logging.debug("Seeking album art with trackId: %s", track_id) album_art: Optional[bytes] = self.radio_util.get_album_art( track_id=track_id ) if not album_art: return FileResponse( path="/var/www/codey.lol/new/public/images/radio_art_default.jpg", ) return Response(content=album_art, media_type="image/png") except Exception as e: logging.debug("album_art_handler Exception: %s", str(e)) traceback.print_exc() return RedirectResponse( url="https://codey.lol/images/radio_art_default.jpg", status_code=302 ) async def radio_now_playing(self, request: Request, station: Station = "main") -> JSONResponse: """ Get information about the currently playing track. Parameters: - **request** (Request): The HTTP request object. - **station** (Station): Name of the station. Defaults to "main". Returns: - **JSONResponse**: Contains the track information. """ ret_obj: dict = {**self.radio_util.now_playing[station]} ret_obj["station"] = station try: ret_obj["elapsed"] = int(time.time()) - ret_obj["start"] except KeyError: traceback.print_exc() ret_obj["elapsed"] = 0 ret_obj.pop("file_path") return JSONResponse(content=ret_obj) async def radio_get_next( self, data: ValidRadioNextRequest, request: Request, background_tasks: BackgroundTasks, user=Depends(get_current_user), ) -> JSONResponse: """ Get the next track in the queue. The track will be removed from the queue in the process. Parameters: - **data** (ValidRadioNextRequest): Contains optional UUID to skip to, and station name. - **request** (Request): The HTTP request object. - **background_tasks** (BackgroundTasks): Background tasks for webhook execution. - **user**: Current authenticated user. Returns: - **JSONResponse**: Contains the next track information. """ if "dj" not in user.get("roles", []): raise HTTPException(status_code=403, detail="Insufficient permissions") logging.info("Radio get next") if data.station not in self.radio_util.active_playlist.keys(): raise HTTPException(status_code=500, detail="No such station/not ready") if ( not isinstance(self.radio_util.active_playlist[data.station], list) or not self.radio_util.active_playlist[data.station] ): if self.radio_util.playlists_loaded: self.radio_util.playlists_loaded = False await self.on_start() return JSONResponse( status_code=500, content={ "err": True, "errorText": "General failure occurred, prompting playlist reload.", }, ) next = self.radio_util.active_playlist[data.station].pop(0) if not isinstance(next, dict): logging.critical("next is of type: %s, reloading playlist...", type(next)) await self.on_start() return JSONResponse( status_code=500, content={ "err": True, "errorText": "General failure occurred, prompting playlist reload.", }, ) duration: int = next["duration"] time_started: int = int(time.time()) time_ends: int = int(time_started + duration) self.radio_util.active_playlist[data.station].append(next) # Push to end of playlist self.radio_util.now_playing[data.station] = next next["start"] = time_started next["end"] = time_ends try: background_tasks.add_task(self.radio_util.webhook_song_change, next, data.station) except Exception as e: logging.info("radio_get_next Exception: %s", str(e)) traceback.print_exc() try: album_art = self.radio_util.get_album_art(track_id=next["id"]) if not album_art: self.radio_util.cache_album_art(next["id"], next["file_path"]) except Exception as e: logging.info("radio_get_next Exception: %s", str(e)) traceback.print_exc() return JSONResponse(content=next) async def radio_request( self, data: ValidRadioSongRequest, request: Request, user=Depends(get_current_user) ) -> JSONResponse: """ Handle song requests. Parameters: - **data** (ValidRadioSongRequest): Contains artist, song, and station name. - **request** (Request): The HTTP request object. - **user**: Current authenticated user. Returns: - **JSONResponse**: Indicates success or failure of the request. """ if "dj" not in user.get("roles", []): raise HTTPException(status_code=403, detail="Insufficient permissions") artistsong: Optional[str] = data.artistsong artist: Optional[str] = data.artist song: Optional[str] = data.song if artistsong and (artist or song): return JSONResponse( status_code=500, content={ "err": True, "errorText": "Invalid request", }, ) if not artistsong and (not artist or not song): return JSONResponse( status_code=500, content={ "err": True, "errorText": "Invalid request", }, ) search: bool = self.radio_util.search_db( artistsong=artistsong, artist=artist, song=song, station=data.station ) if data.alsoSkip: await self.radio_util._ls_skip(data.station) return JSONResponse(content={"result": search}) def radio_typeahead( self, data: ValidRadioTypeaheadRequest, request: Request, user=Depends(get_current_user) ) -> JSONResponse: """ Handle typeahead queries for the radio. Parameters: - **data** (ValidRadioTypeaheadRequest): Contains the typeahead query. - **request** (Request): The HTTP request object. - **user**: Current authenticated user. Returns: - **JSONResponse**: Contains the typeahead results. """ if "dj" not in user.get("roles", []): raise HTTPException(status_code=403, detail="Insufficient permissions") if not isinstance(data.query, str): return JSONResponse( status_code=500, content={ "err": True, "errorText": "Invalid request.", }, ) typeahead: Optional[list[str]] = self.radio_util.trackdb_typeahead(data.query) if not typeahead: return JSONResponse(content=[]) return JSONResponse(content=typeahead)