Enhance LRC and SR duration matching logic by increasing tolerance from 5 seconds to 10 seconds + improve LRC websocket broadcasting logic

This commit is contained in:
2025-09-27 09:17:24 -04:00
parent 00614326a4
commit 061aed296f
6 changed files with 722 additions and 108 deletions

View File

@@ -31,11 +31,14 @@ from fastapi import (
from fastapi_throttle import RateLimiter
from fastapi.responses import RedirectResponse, JSONResponse, FileResponse
from auth.deps import get_current_user
from collections import defaultdict
class Radio(FastAPI):
"""Radio Endpoints"""
def __init__(self, app: FastAPI, my_util, constants, loop) -> None:
# Initialize broadcast locks to prevent duplicate events (will be set in on_start)
self.broadcast_locks = {}
self.app: FastAPI = app
self.util = my_util
self.constants = constants
@@ -44,9 +47,12 @@ class Radio(FastAPI):
self.sr_util = SRUtil()
self.lrclib = LRCLib()
self.lrc_cache: Dict[str, Optional[str]] = {}
self.lrc_cache_locks = {}
self.playlists_loaded: bool = False
# WebSocket connection management
self.active_connections: Dict[str, Set[WebSocket]] = {}
# Initialize broadcast locks to prevent duplicate events
self.broadcast_locks = defaultdict(asyncio.Lock)
self.endpoints: dict = {
"radio/np": self.radio_now_playing,
"radio/request": self.radio_request,
@@ -80,6 +86,9 @@ class Radio(FastAPI):
app.add_event_handler("startup", self.on_start)
async def on_start(self) -> None:
# Initialize locks in the event loop
self.lrc_cache_locks = defaultdict(asyncio.Lock)
self.broadcast_locks = defaultdict(asyncio.Lock)
stations = ", ".join(self.radio_util.db_queries.keys())
logging.info("radio: Initializing stations:\n%s", stations)
await self.radio_util.load_playlists()
@@ -424,16 +433,41 @@ class Radio(FastAPI):
next["start"] = time_started
next["end"] = time_ends
# Clear the LRC cache for the station
self.lrc_cache.pop(data.station, None)
# Use BackgroundTasks with a sync wrapper for LRC fetch/cache
def lrc_fetch_sync(station, track_json):
import asyncio
try:
async def lrc_fetch():
async with self.lrc_cache_locks[station]:
self.lrc_cache.pop(station, None)
lrc, source = await self._fetch_and_cache_lrc(station, track_json)
if lrc:
self.lrc_cache[station] = lrc
else:
self.lrc_cache[station] = None
if lrc:
await self.broadcast_lrc(station, lrc, source)
asyncio.run(lrc_fetch())
except Exception as e:
logging.error(f"[LRC] Error during LRC fetch/cache: {e}")
try:
# Pass a copy of the track dict to avoid mutation issues
background_tasks.add_task(lrc_fetch_sync, data.station, next.copy())
except Exception as e:
logging.error(f"[LRC] Could not schedule LRC fetch task: {e}")
try:
background_tasks.add_task(self.radio_util.webhook_song_change, next, data.station)
# Broadcast track change to WebSocket clients
background_tasks.add_task(self.broadcast_track_change, data.station, next.copy())
background_tasks.add_task(self.radio_util.webhook_song_change, next, data.station)
except Exception as e:
logging.info("radio_get_next Exception: %s", str(e))
traceback.print_exc()
try:
await self.broadcast_track_change(data.station, next.copy())
except Exception as e:
logging.info("radio_get_next Exception: %s", str(e))
traceback.print_exc()
try:
album_art = self.radio_util.get_album_art(track_id=next["id"])
if not album_art:
@@ -441,6 +475,7 @@ class Radio(FastAPI):
except Exception as e:
logging.info("radio_get_next Exception: %s", str(e))
traceback.print_exc()
return JSONResponse(content=next)
async def radio_request(
@@ -545,8 +580,16 @@ class Radio(FastAPI):
current_track.pop("file_path", None) # Ensure file_path is stripped
await websocket.send_text(json.dumps(current_track))
# Send LRC asynchronously
asyncio.create_task(self._send_lrc_to_client(websocket, station, current_track))
# Send cached LRC if available; do not attempt to fetch again for this client
async with self.lrc_cache_locks[station]:
cached_lrc = self.lrc_cache.get(station)
if cached_lrc:
lrc_data = {
"type": "lrc",
"data": cached_lrc,
"source": "Cache"
}
await websocket.send_text(json.dumps(lrc_data))
# Keep connection alive and handle incoming messages
while True:
@@ -600,129 +643,178 @@ class Radio(FastAPI):
- **station** (str): Station name
- **track_data** (dict): New track information
"""
async with self.broadcast_locks[station]:
if station not in self.active_connections:
return
# Take a snapshot of current clients for this update
current_clients = set(self.active_connections[station])
if not current_clients:
return
# Remove sensitive file path info before broadcasting
track_data_clean = track_data.copy()
track_data_clean.pop("file_path", None)
# Create and send track change message first
broadcast_data = {
"type": "track_change",
"data": track_data_clean
}
# Send track change to all clients
disconnected_clients = set()
for websocket in current_clients:
try:
await websocket.send_text(json.dumps(broadcast_data))
except Exception as e:
logging.warning(f"[Track] Failed to send track change: {e}")
disconnected_clients.add(websocket)
# Remove disconnected clients from our snapshot and active connections
current_clients -= disconnected_clients
for websocket in disconnected_clients:
self.active_connections[station].discard(websocket)
if not current_clients:
logging.warning("[Track] No clients remaining after track broadcast")
return
async def broadcast_lrc(self, station: str, lrc: str, source: str):
"""Broadcast LRC data to all connected clients for a station."""
if station not in self.active_connections:
return
# Remove sensitive file path info
track_data.pop("file_path", None)
# Create broadcast message
broadcast_data = {
"type": "track_change",
"data": track_data
current_clients = set(self.active_connections[station])
if not current_clients:
return
lrc_data = {
"type": "lrc",
"data": lrc,
"source": source
}
# Send to all connected clients for this station
disconnected_clients = set()
for websocket in self.active_connections[station]:
for websocket in current_clients:
try:
await websocket.send_text(json.dumps(broadcast_data))
await websocket.send_text(json.dumps(lrc_data))
except Exception as e:
logging.warning(f"Failed to send WebSocket message: {e}")
logging.warning(f"[LRC Broadcast] Failed to send to client: {e}")
disconnected_clients.add(websocket)
# Remove failed connections
for websocket in disconnected_clients:
self.active_connections[station].discard(websocket)
# Broadcast LRC asynchronously
asyncio.create_task(self._broadcast_lrc(station, track_data))
async def _send_lrc_to_client(self, websocket: WebSocket, station: str, track_data: dict):
"""Send LRC data to a specific client asynchronously."""
logging.info(f"Sending LRC to client for station {station}")
logging.info(f"Track data: {track_data}")
"""Send cached LRC data to a specific client asynchronously. Only sends if LRC exists in cache."""
logging.info(f"[LRC Send] Checking cached LRC for station {station}")
logging.info(f"[LRC Send] Current track: {track_data.get('artist', 'Unknown')} - {track_data.get('song', 'Unknown')}")
try:
# Always check if LRC is already cached
# Only send if LRC is in cache
cached_lrc = self.lrc_cache.get(station)
logging.info(f"[LRC Send] Cache status for station {station}: {'Found' if cached_lrc else 'Not found'}")
if cached_lrc:
logging.info("Using cached LRC for client")
logging.info("[LRC Send] Sending cached LRC to client")
lrc_data: dict = {
"type": "lrc",
"data": cached_lrc,
"source": "Cache"
}
await websocket.send_text(json.dumps(lrc_data))
return
logging.info("[LRC Send] Successfully sent cached LRC to client")
else:
logging.info(f"[LRC Send] No cached LRC available for station {station}")
except Exception as e:
logging.error(f"[LRC Send] Failed to send cached LRC to client: {e}")
logging.error(f"[LRC Send] Error details: {traceback.format_exc()}")
# Fetch LRC if not cached
async def send_lrc_to_client(self, websocket: WebSocket, station: str, track_data: dict):
"""Send cached LRC data to a specific client asynchronously. Only sends if valid LRC exists in cache."""
try:
track_info = f"{track_data.get('artist', 'Unknown')} - {track_data.get('song', 'Unknown')}"
logging.info(f"[LRC Send {id(websocket)}] Starting LRC send for {track_info}")
logging.info(f"[LRC Send {id(websocket)}] Cache keys before lock: {list(self.lrc_cache.keys())}")
# Get cached LRC with lock to ensure consistency
async with self.lrc_cache_locks[station]:
logging.info(f"[LRC Send {id(websocket)}] Got cache lock")
cached_lrc = self.lrc_cache.get(station)
logging.info(f"[LRC Send {id(websocket)}] Cache keys during lock: {list(self.lrc_cache.keys())}")
logging.info(f"[LRC Send {id(websocket)}] Cache entry length: {len(cached_lrc) if cached_lrc else 0}")
# Only send if we have actual lyrics
if cached_lrc:
logging.info(f"[LRC Send {id(websocket)}] Preparing to send {len(cached_lrc)} bytes of LRC")
lrc_data: dict = {
"type": "lrc",
"data": cached_lrc,
"source": "Cache"
}
await websocket.send_text(json.dumps(lrc_data))
logging.info(f"[LRC Send {id(websocket)}] Successfully sent LRC")
else:
logging.info(f"[LRC Send {id(websocket)}] No LRC in cache")
# If we have no cache entry, let's check if a fetch is needed
async with self.lrc_cache_locks[station]:
logging.info(f"[LRC Send {id(websocket)}] Checking if fetch needed")
# Only attempt fetch if we're the first to notice missing lyrics
if station not in self.lrc_cache:
logging.info(f"[LRC Send {id(websocket)}] Initiating LRC fetch")
lrc, source = await self._fetch_and_cache_lrc(station, track_data)
if lrc:
self.lrc_cache[station] = lrc
lrc_data: dict = {
"type": "lrc",
"data": lrc,
"source": source
}
await websocket.send_text(json.dumps(lrc_data))
logging.info(f"[LRC Send {id(websocket)}] Sent newly fetched LRC")
except Exception as e:
logging.error(f"[LRC Send {id(websocket)}] Failed: {e}")
logging.error(f"[LRC Send {id(websocket)}] Error details: {traceback.format_exc()}")
async def _fetch_and_cache_lrc(self, station: str, track_data: dict) -> tuple[Optional[str], str]:
"""Fetch and cache LRC data for a station's current track."""
try:
artist: Optional[str] = track_data.get("artist")
title: Optional[str] = track_data.get("song")
duration: Optional[int] = track_data.get("duration")
if artist and title:
logging.info(f"Fetching LRC for {artist} - {title} (duration: {duration})")
lrc: Optional[str] = await self.sr_util.get_lrc_by_artist_song(
artist, title, duration=duration
)
source: str = "SR"
if not lrc:
logging.info(f"No LRC from SR, trying LRCLib for {artist} - {title}")
if not (artist and title):
logging.info("[LRC] Missing artist or title, skipping fetch")
return None, "None"
logging.info(f"[LRC] Starting fetch for {station}: {artist} - {title}")
# Try SR first with timeout
try:
async with asyncio.timeout(5.0): # 5 second timeout
lrc = await self.sr_util.get_lrc_by_artist_song(
artist, title, duration=duration
)
if lrc:
logging.info("[LRC] Found from SR")
return lrc, "SR"
except asyncio.TimeoutError:
logging.warning("[LRC] SR fetch timed out")
except Exception as e:
logging.error(f"[LRC] SR fetch error: {e}")
logging.info("[LRC] SR fetch completed without results")
# Try LRCLib as fallback with timeout
try:
async with asyncio.timeout(5.0): # 5 second timeout
logging.info("[LRC] Trying LRCLib fallback")
lrclib_result = await self.lrclib.search(artist, title, plain=False)
if lrclib_result and lrclib_result.lyrics and isinstance(lrclib_result.lyrics, str):
lrc = lrclib_result.lyrics
source = "LRCLib"
logging.info("LRC found via LRCLib fallback")
if lrc:
self.lrc_cache[station] = lrc # Cache the LRC regardless of source
lrc_data: dict = {
"type": "lrc",
"data": lrc,
"source": source
}
await websocket.send_text(json.dumps(lrc_data))
logging.info("LRC sent to client")
else:
logging.info("No LRC found from any source.")
logging.info("[LRC] Found from LRCLib")
return lrclib_result.lyrics, "LRCLib"
except asyncio.TimeoutError:
logging.warning("[LRC] LRCLib fetch timed out")
except Exception as e:
logging.error(f"[LRC] LRCLib fetch error: {e}")
logging.info("[LRC] No lyrics found from any source")
return None, "None"
except Exception as e:
logging.error(f"Failed to send LRC to client: {e}")
async def _broadcast_lrc(self, station: str, track_data: dict):
"""Broadcast LRC data to all connected clients for a station asynchronously."""
if station not in self.active_connections:
return
try:
# Clear the LRC cache for the station on track change
self.lrc_cache.pop(station, None)
# Fetch LRC if not cached
artist: Optional[str] = track_data.get("artist")
title: Optional[str] = track_data.get("song") # Changed from "title" to "song"
duration: Optional[int] = track_data.get("duration")
if artist and title:
logging.info(f"Broadcasting LRC fetch for {artist} - {title} (duration: {duration})")
lrc: Optional[str] = await self.sr_util.get_lrc_by_artist_song(
artist, title, duration=duration
)
source: str = "SR"
if not lrc:
logging.info(f"No LRC from SR, trying LRCLib for {artist} - {title}")
lrclib_result = await self.lrclib.search(artist, title, plain=False)
if lrclib_result and lrclib_result.lyrics and isinstance(lrclib_result.lyrics, str):
lrc = lrclib_result.lyrics
source = "LRCLib"
logging.info("LRC found via LRCLib fallback")
self.lrc_cache[station] = lrc # Cache the LRC
logging.info(f"LRC fetched for broadcast: {lrc is not None}")
if lrc:
lrc_data: dict = {
"type": "lrc",
"data": lrc,
"source": source
}
# Send to all connected clients
disconnected_clients = set()
for websocket in self.active_connections[station]:
try:
await websocket.send_text(json.dumps(lrc_data))
except Exception as e:
logging.warning(f"Failed to send LRC to client: {e}")
disconnected_clients.add(websocket)
for websocket in disconnected_clients:
self.active_connections[station].discard(websocket)
logging.info("LRC broadcasted to clients")
except Exception as e:
logging.error(f"Failed to broadcast LRC: {e}")
logging.error(f"[LRC] Error fetching lyrics: {e}")
return None, "None"