Files
api/endpoints/radio.py

743 lines
29 KiB
Python

import logging
import traceback
import time
import random
import json
import asyncio
from typing import Dict, Set
from .constructors import (
ValidRadioNextRequest,
ValidRadioReshuffleRequest,
ValidRadioQueueShiftRequest,
ValidRadioQueueRemovalRequest,
ValidRadioSongRequest,
ValidRadioTypeaheadRequest,
ValidRadioQueueRequest,
Station
)
from utils import radio_util
from utils.sr_wrapper import SRUtil
from lyric_search.sources.lrclib import LRCLib
from typing import Optional
from fastapi import (
FastAPI,
BackgroundTasks,
Request,
Response,
HTTPException,
Depends,
WebSocket,
WebSocketDisconnect)
from fastapi_throttle import RateLimiter
from fastapi.responses import RedirectResponse, JSONResponse, FileResponse
from auth.deps import get_current_user
class Radio(FastAPI):
"""Radio Endpoints"""
def __init__(self, app: FastAPI, my_util, constants, loop) -> None:
self.app: FastAPI = app
self.util = my_util
self.constants = constants
self.loop = loop
self.radio_util = radio_util.RadioUtil(self.constants, self.loop)
self.sr_util = SRUtil()
self.lrclib = LRCLib()
self.lrc_cache: Dict[str, Optional[str]] = {}
self.playlists_loaded: bool = False
# WebSocket connection management
self.active_connections: Dict[str, Set[WebSocket]] = {}
self.endpoints: dict = {
"radio/np": self.radio_now_playing,
"radio/request": self.radio_request,
"radio/typeahead": self.radio_typeahead,
"radio/get_queue": self.radio_get_queue,
"radio/skip": self.radio_skip,
"radio/queue_shift": self.radio_queue_shift,
"radio/reshuffle": self.radio_reshuffle,
"radio/queue_remove": self.radio_queue_remove,
"radio/ls._next_": self.radio_get_next,
"radio/album_art": self.album_art_handler,
}
for endpoint, handler in self.endpoints.items():
methods: list[str] = ["POST"]
if endpoint == "radio/album_art":
methods = ["GET"]
app.add_api_route(
f"/{endpoint}", handler, methods=methods, include_in_schema=True,
dependencies=[Depends(
RateLimiter(times=25, seconds=2))] if not endpoint == "radio/np" else None,
)
# Add WebSocket route
async def websocket_route_handler(websocket: WebSocket):
station = websocket.path_params.get("station", "main")
await self.websocket_endpoint_handler(websocket, station)
app.add_websocket_route("/radio/ws/{station}", websocket_route_handler)
app.add_event_handler("startup", self.on_start)
async def on_start(self) -> None:
stations = ", ".join(self.radio_util.db_queries.keys())
logging.info("radio: Initializing stations:\n%s", stations)
await self.radio_util.load_playlists()
async def radio_skip(
self, data: ValidRadioNextRequest, request: Request, user=Depends(get_current_user)
) -> JSONResponse:
"""
Skip to the next track in the queue, or to the UUID specified in `skipTo` if provided.
Parameters:
- **data** (ValidRadioNextRequest): Contains optional UUID to skip to, and station name.
- **request** (Request): The HTTP request object.
- **user**: Current authenticated user.
Returns:
- **JSONResponse**: Indicates success or failure of the skip operation.
"""
if "dj" not in user.get("roles", []):
raise HTTPException(status_code=403, detail="Insufficient permissions")
try:
if data.skipTo:
queue_item = self.radio_util.get_queue_item_by_uuid(data.skipTo, data.station)
if not queue_item:
return JSONResponse(
status_code=500,
content={
"err": True,
"errorText": "No such queue item.",
},
)
self.radio_util.active_playlist[data.station] = self.radio_util.active_playlist[data.station][
queue_item[0] :
]
skip_result: bool = await self.radio_util._ls_skip(data.station)
status_code = 200 if skip_result else 500
return JSONResponse(
status_code=status_code,
content={
"success": skip_result,
},
)
except Exception as e:
logging.debug("radio_skip Exception: %s", str(e))
traceback.print_exc()
if not isinstance(e, HTTPException):
return JSONResponse(
status_code=500,
content={
"err": True,
"errorText": "General failure.",
},
)
raise e # Re-raise HTTPException
async def radio_reshuffle(
self, data: ValidRadioReshuffleRequest, request: Request, user=Depends(get_current_user)
) -> JSONResponse:
"""
Reshuffle the play queue.
Parameters:
- **data** (ValidRadioReshuffleRequest): Contains the station name.
- **request** (Request): The HTTP request object.
- **user**: Current authenticated user.
Returns:
- **JSONResponse**: Indicates success of the reshuffle operation.
"""
if "dj" not in user.get("roles", []):
raise HTTPException(status_code=403, detail="Insufficient permissions")
random.shuffle(self.radio_util.active_playlist[data.station])
return JSONResponse(content={"ok": True})
async def radio_get_queue(
self,
request: Request,
data: Optional[ValidRadioQueueRequest] = None,
) -> JSONResponse:
"""
Get the current play queue (paged, 20 results per page).
Parameters:
- **request** (Request): The HTTP request object.
- **data** (Optional[ValidRadioQueueRequest]): Contains the station name and optional search query.
Returns:
- **JSONResponse**: Contains the paged queue data.
"""
if not (data and data.station):
return JSONResponse(status_code=500,
content={
"err": True,
"errorText": "Invalid request.",
})
search: Optional[str] = None
draw: int = 0
if isinstance(data, ValidRadioQueueRequest):
search = data.search
draw = data.draw or 0
start: int = int(data.start or 0)
end: int = start + 20
else:
start: int = 0
end: int = 20
orig_queue: list[dict] = self.radio_util.active_playlist[data.station]
if not search:
queue_full: Optional[list] = orig_queue
else:
queue_full = self.radio_util.datatables_search(search, data.station)
if not queue_full:
return JSONResponse(
status_code=500,
content={
"err": True,
"errorText": "No queue found.",
}
)
queue: list = queue_full[start:end]
queue_out: list[dict] = []
for x, item in enumerate(queue):
queue_out.append(
{
"pos": orig_queue.index(item),
"id": item.get("id"),
"uuid": item.get("uuid"),
"artist": item.get("artist"),
"song": item.get("song"),
"album": item.get("album", "N/A"),
"genre": item.get("genre", "N/A"),
"artistsong": item.get("artistsong"),
"duration": item.get("duration"),
}
)
full_playlist_len: int = len(orig_queue)
filtered_len: int = len(queue_full)
out_json = {
"draw": draw,
"recordsTotal": full_playlist_len,
"recordsFiltered": filtered_len,
"items": queue_out,
}
return JSONResponse(content=out_json)
async def radio_queue_shift(
self, data: ValidRadioQueueShiftRequest, request: Request, user=Depends(get_current_user)
) -> JSONResponse:
"""
Shift the position of a UUID within the queue.
Parameters:
- **data** (ValidRadioQueueShiftRequest): Contains the UUID to shift, and station name.
- **request** (Request): The HTTP request object.
- **user**: Current authenticated user.
Returns:
- **JSONResponse**: Indicates success of the shift operation.
"""
if "dj" not in user.get("roles", []):
raise HTTPException(status_code=403, detail="Insufficient permissions")
queue_item = self.radio_util.get_queue_item_by_uuid(data.uuid, data.station)
if not queue_item:
return JSONResponse(
status_code=500,
content={
"err": True,
"errorText": "Queue item not found.",
},
)
(x, item) = queue_item
self.radio_util.active_playlist[data.station].pop(x)
self.radio_util.active_playlist[data.station].insert(0, item)
if not data.next:
await self.radio_util._ls_skip(data.station)
return JSONResponse(
content={
"ok": True,
}
)
async def radio_queue_remove(
self, data: ValidRadioQueueRemovalRequest, request: Request, user=Depends(get_current_user)
) -> JSONResponse:
"""
Remove an item from the current play queue.
Parameters:
- **data** (ValidRadioQueueRemovalRequest): Contains the UUID of the item to remove, and station name.
- **request** (Request): The HTTP request object.
- **user**: Current authenticated user.
Returns:
- **JSONResponse**: Indicates success of the removal operation.
"""
if "dj" not in user.get("roles", []):
raise HTTPException(status_code=403, detail="Insufficient permissions")
queue_item = self.radio_util.get_queue_item_by_uuid(data.uuid, data.station)
if not queue_item:
return JSONResponse(
status_code=500,
content={
"err": True,
"errorText": "Queue item not found.",
},
)
self.radio_util.active_playlist[data.station].pop(queue_item[0])
return JSONResponse(
content={
"ok": True,
}
)
async def album_art_handler(
self, request: Request, track_id: Optional[int] = None,
station: Station = "main"
) -> Response:
"""
Get album art for the current or specified track.
Parameters:
- **request** (Request): The HTTP request object.
- **track_id** (Optional[int]): ID of the track to retrieve album art for. Defaults to the current track.
- **station** (Station): Name of the station. Defaults to "main".
Returns:
- **Response**: Contains the album art image or a default image.
"""
try:
if not track_id:
track_id = self.radio_util.now_playing[station].get("id")
if not track_id:
# Still no track ID
return JSONResponse(status_code=500,
content={
"err": True,
"errorText": "Invalid request",
})
logging.debug("Seeking album art with trackId: %s", track_id)
album_art: Optional[bytes] = self.radio_util.get_album_art(
track_id=track_id
)
if not album_art:
return FileResponse(
path="/var/www/codey.lol/new/public/images/radio_art_default.jpg",
)
return Response(content=album_art, media_type="image/png")
except Exception as e:
logging.debug("album_art_handler Exception: %s", str(e))
traceback.print_exc()
return RedirectResponse(
url="https://codey.lol/images/radio_art_default.jpg", status_code=302
)
async def radio_now_playing(self, request: Request,
station: Station = "main") -> JSONResponse:
"""
Get information about the currently playing track.
Parameters:
- **request** (Request): The HTTP request object.
- **station** (Station): Name of the station. Defaults to "main".
Returns:
- **JSONResponse**: Contains the track information.
"""
ret_obj: dict = {**self.radio_util.now_playing[station]}
ret_obj["station"] = station
try:
ret_obj["elapsed"] = int(time.time()) - ret_obj["start"]
except KeyError:
traceback.print_exc()
ret_obj["elapsed"] = 0
ret_obj.pop("file_path")
return JSONResponse(content=ret_obj)
async def radio_get_next(
self,
data: ValidRadioNextRequest,
request: Request,
background_tasks: BackgroundTasks,
user=Depends(get_current_user),
) -> JSONResponse:
"""
Get the next track in the queue. The track will be removed from the queue in the process.
Parameters:
- **data** (ValidRadioNextRequest): Contains optional UUID to skip to, and station name.
- **request** (Request): The HTTP request object.
- **background_tasks** (BackgroundTasks): Background tasks for webhook execution.
- **user**: Current authenticated user.
Returns:
- **JSONResponse**: Contains the next track information.
"""
if "dj" not in user.get("roles", []):
raise HTTPException(status_code=403, detail="Insufficient permissions")
logging.info("Radio get next")
if data.station not in self.radio_util.active_playlist.keys():
raise HTTPException(status_code=500, detail="No such station/not ready")
if (
not isinstance(self.radio_util.active_playlist[data.station], list)
or not self.radio_util.active_playlist[data.station]
):
if self.radio_util.playlists_loaded:
self.radio_util.playlists_loaded = False
await self.on_start()
return JSONResponse(
status_code=500,
content={
"err": True,
"errorText": "General failure occurred, prompting playlist reload.",
},
)
next = self.radio_util.active_playlist[data.station].pop(0)
if not isinstance(next, dict):
logging.critical("next is of type: %s, reloading playlist...", type(next))
await self.on_start()
return JSONResponse(
status_code=500,
content={
"err": True,
"errorText": "General failure occurred, prompting playlist reload.",
},
)
duration: int = next["duration"]
time_started: int = int(time.time())
time_ends: int = int(time_started + duration)
self.radio_util.active_playlist[data.station].append(next) # Push to end of playlist
self.radio_util.now_playing[data.station] = next
next["start"] = time_started
next["end"] = time_ends
try:
background_tasks.add_task(self.radio_util.webhook_song_change, next, data.station)
# Broadcast track change to WebSocket clients
background_tasks.add_task(self.broadcast_track_change, data.station, next.copy())
except Exception as e:
logging.info("radio_get_next Exception: %s", str(e))
traceback.print_exc()
try:
album_art = self.radio_util.get_album_art(track_id=next["id"])
if not album_art:
self.radio_util.cache_album_art(next["id"], next["file_path"])
except Exception as e:
logging.info("radio_get_next Exception: %s", str(e))
traceback.print_exc()
return JSONResponse(content=next)
async def radio_request(
self, data: ValidRadioSongRequest, request: Request, user=Depends(get_current_user)
) -> JSONResponse:
"""
Handle song requests.
Parameters:
- **data** (ValidRadioSongRequest): Contains artist, song, and station name.
- **request** (Request): The HTTP request object.
- **user**: Current authenticated user.
Returns:
- **JSONResponse**: Indicates success or failure of the request.
"""
if "dj" not in user.get("roles", []):
raise HTTPException(status_code=403, detail="Insufficient permissions")
artistsong: Optional[str] = data.artistsong
artist: Optional[str] = data.artist
song: Optional[str] = data.song
if artistsong and (artist or song):
return JSONResponse(
status_code=500,
content={
"err": True,
"errorText": "Invalid request",
},
)
if not artistsong and (not artist or not song):
return JSONResponse(
status_code=500,
content={
"err": True,
"errorText": "Invalid request",
},
)
search: bool = self.radio_util.search_db(
artistsong=artistsong, artist=artist, song=song, station=data.station
)
if data.alsoSkip:
await self.radio_util._ls_skip(data.station)
return JSONResponse(content={"result": search})
def radio_typeahead(
self, data: ValidRadioTypeaheadRequest, request: Request
) -> JSONResponse:
"""
Handle typeahead queries for the radio.
Parameters:
- **data** (ValidRadioTypeaheadRequest): Contains the typeahead query.
- **request** (Request): The HTTP request object.
# - **user**: Current authenticated user.
Returns:
- **JSONResponse**: Contains the typeahead results.
"""
# if "dj" not in user.get("roles", []):
# raise HTTPException(status_code=403, detail="Insufficient permissions")
if not isinstance(data.query, str):
return JSONResponse(
status_code=500,
content={
"err": True,
"errorText": "Invalid request.",
},
)
typeahead: Optional[list[str]] = self.radio_util.trackdb_typeahead(data.query)
if not typeahead:
return JSONResponse(content=[])
return JSONResponse(content=typeahead)
async def websocket_endpoint_handler(self, websocket: WebSocket, station: str):
"""
WebSocket endpoint for real-time radio updates.
Clients can connect to /radio/ws/{station} to receive:
- Current track info on connect
- Real-time updates when tracks change
Parameters:
- **websocket** (WebSocket): The WebSocket connection
- **station** (str): The radio station name
"""
await websocket.accept()
# Initialize connections dict for this station if not exists
if station not in self.active_connections:
self.active_connections[station] = set()
# Add this connection to the station's connection set
self.active_connections[station].add(websocket)
try:
# Send current track info immediately on connect
current_track = await self._get_now_playing_data(station)
current_track.pop("file_path", None) # Ensure file_path is stripped
await websocket.send_text(json.dumps(current_track))
# Send LRC asynchronously
asyncio.create_task(self._send_lrc_to_client(websocket, station, current_track))
# Keep connection alive and handle incoming messages
while True:
try:
# Wait for messages (optional - could be used for client commands)
data = await websocket.receive_text()
# For now, just echo back a confirmation
await websocket.send_text(json.dumps({"type": "ack", "data": data}))
except WebSocketDisconnect:
break
except WebSocketDisconnect:
pass
finally:
# Remove connection when client disconnects
if station in self.active_connections:
self.active_connections[station].discard(websocket)
# Clean up empty station sets
if not self.active_connections[station]:
del self.active_connections[station]
async def _get_now_playing_data(self, station: str) -> dict:
"""
Get now playing data for a specific station.
Parameters:
- **station** (str): Station name
Returns:
- **dict**: Current track information
"""
ret_obj: dict = {**self.radio_util.now_playing.get(station, {})}
ret_obj["station"] = station
try:
if "start" in ret_obj:
ret_obj["elapsed"] = int(time.time()) - ret_obj["start"]
else:
ret_obj["elapsed"] = 0
except KeyError:
ret_obj["elapsed"] = 0
# Remove sensitive file path info
ret_obj.pop("file_path", None)
return ret_obj
async def broadcast_track_change(self, station: str, track_data: dict):
"""
Broadcast track change to all connected WebSocket clients for a station.
Parameters:
- **station** (str): Station name
- **track_data** (dict): New track information
"""
if station not in self.active_connections:
return
# Remove sensitive file path info
track_data.pop("file_path", None)
# Create broadcast message
broadcast_data = {
"type": "track_change",
"data": track_data
}
# Send to all connected clients for this station
disconnected_clients = set()
for websocket in self.active_connections[station]:
try:
await websocket.send_text(json.dumps(broadcast_data))
except Exception as e:
logging.warning(f"Failed to send WebSocket message: {e}")
disconnected_clients.add(websocket)
# Remove failed connections
for websocket in disconnected_clients:
self.active_connections[station].discard(websocket)
# Broadcast LRC asynchronously
asyncio.create_task(self._broadcast_lrc(station, track_data))
async def _send_lrc_to_client(self, websocket: WebSocket, station: str, track_data: dict):
"""Send LRC data to a specific client asynchronously."""
logging.info(f"Sending LRC to client for station {station}")
logging.info(f"Track data: {track_data}")
try:
artist: Optional[str] = track_data.get("artist")
title: Optional[str] = track_data.get("song") # Changed from "title" to "song"
duration: Optional[int] = track_data.get("duration")
# Check if LRC is already cached
cached_lrc = self.lrc_cache.get(station)
if cached_lrc:
logging.info("LRC found in cache, sending immediately.")
lrc_data: dict = {
"type": "lrc",
"data": cached_lrc,
"source": "Cache"
}
await websocket.send_text(json.dumps(lrc_data))
return
if artist and title:
logging.info(f"Fetching LRC for {artist} - {title} (duration: {duration})")
lrc: Optional[str] = await self.sr_util.get_lrc_by_artist_song(
artist, title, duration=duration
)
source: str = "SR"
if not lrc:
logging.info(f"No LRC from SR, trying LRCLib for {artist} - {title}")
lrclib_result = await self.lrclib.search(artist, title, plain=False)
if lrclib_result and lrclib_result.lyrics and isinstance(lrclib_result.lyrics, str):
lrc = lrclib_result.lyrics
source = "LRCLib"
logging.info("LRC found via LRCLib fallback")
self.lrc_cache[station] = lrc # Cache the LRC for future use
logging.info(f"LRC fetched: {lrc is not None}")
if lrc:
lrc_data: dict = {
"type": "lrc",
"data": lrc,
"source": source
}
await websocket.send_text(json.dumps(lrc_data))
logging.info("LRC sent to client")
except Exception as e:
logging.error(f"Failed to send LRC to client: {e}")
async def _broadcast_lrc(self, station: str, track_data: dict):
"""Broadcast LRC data to all connected clients for a station asynchronously."""
if station not in self.active_connections:
return
try:
artist: Optional[str] = track_data.get("artist")
title: Optional[str] = track_data.get("song") # Changed from "title" to "song"
duration: Optional[int] = track_data.get("duration")
# Check if LRC is already cached
cached_lrc = self.lrc_cache.get(station)
if cached_lrc:
logging.info("LRC found in cache, broadcasting immediately.")
lrc_data: dict = {
"type": "lrc",
"data": cached_lrc,
"source": "Cache"
}
disconnected_clients = set()
for websocket in self.active_connections[station]:
try:
await websocket.send_text(json.dumps(lrc_data))
except Exception as e:
logging.warning(f"Failed to send LRC to client: {e}")
disconnected_clients.add(websocket)
# Remove failed connections
for websocket in disconnected_clients:
self.active_connections[station].discard(websocket)
return
if artist and title:
logging.info(f"Broadcasting LRC fetch for {artist} - {title} (duration: {duration})")
lrc: Optional[str] = await self.sr_util.get_lrc_by_artist_song(
artist, title, duration=duration
)
source: str = "SR"
if not lrc:
logging.info(f"No LRC from SR, trying LRCLib for {artist} - {title}")
lrclib_result = await self.lrclib.search(artist, title, plain=False)
if lrclib_result and lrclib_result.lyrics and isinstance(lrclib_result.lyrics, str):
lrc = lrclib_result.lyrics
source = "LRCLib"
logging.info("LRC found via LRCLib fallback")
self.lrc_cache[station] = lrc # Cache the LRC for future use
logging.info(f"LRC fetched for broadcast: {lrc is not None}")
if lrc:
lrc_data: dict = {
"type": "lrc",
"data": lrc,
"source": source
}
# Send to all connected clients
disconnected_clients = set()
for websocket in self.active_connections[station]:
try:
await websocket.send_text(json.dumps(lrc_data))
except Exception as e:
logging.warning(f"Failed to send LRC to client: {e}")
disconnected_clients.add(websocket)
# Remove failed connections
for websocket in disconnected_clients:
self.active_connections[station].discard(websocket)
logging.info("LRC broadcasted to clients")
except Exception as e:
logging.error(f"Failed to broadcast LRC: {e}")