From 061aed296f21880568dce6f109dc42cf8d57210f Mon Sep 17 00:00:00 2001 From: codey Date: Sat, 27 Sep 2025 09:17:24 -0400 Subject: [PATCH] Enhance LRC and SR duration matching logic by increasing tolerance from 5 seconds to 10 seconds + improve LRC websocket broadcasting logic --- endpoints/radio.py | 304 +++++++++++++++++++++------------ lyric_search/sources/lrclib.py | 2 +- test/liquidsoap.liq | 252 +++++++++++++++++++++++++++ test/liquidsoap.ls | 270 +++++++++++++++++++++++++++++ test/minimal_test.liq | 0 utils/sr_wrapper.py | 2 +- 6 files changed, 722 insertions(+), 108 deletions(-) create mode 100644 test/liquidsoap.liq create mode 100644 test/liquidsoap.ls create mode 100644 test/minimal_test.liq diff --git a/endpoints/radio.py b/endpoints/radio.py index ecec084..85c3209 100644 --- a/endpoints/radio.py +++ b/endpoints/radio.py @@ -31,11 +31,14 @@ from fastapi import ( from fastapi_throttle import RateLimiter from fastapi.responses import RedirectResponse, JSONResponse, FileResponse from auth.deps import get_current_user +from collections import defaultdict class Radio(FastAPI): """Radio Endpoints""" def __init__(self, app: FastAPI, my_util, constants, loop) -> None: + # Initialize broadcast locks to prevent duplicate events (will be set in on_start) + self.broadcast_locks = {} self.app: FastAPI = app self.util = my_util self.constants = constants @@ -44,9 +47,12 @@ class Radio(FastAPI): self.sr_util = SRUtil() self.lrclib = LRCLib() self.lrc_cache: Dict[str, Optional[str]] = {} + self.lrc_cache_locks = {} self.playlists_loaded: bool = False # WebSocket connection management self.active_connections: Dict[str, Set[WebSocket]] = {} + # Initialize broadcast locks to prevent duplicate events + self.broadcast_locks = defaultdict(asyncio.Lock) self.endpoints: dict = { "radio/np": self.radio_now_playing, "radio/request": self.radio_request, @@ -80,6 +86,9 @@ class Radio(FastAPI): app.add_event_handler("startup", self.on_start) async def on_start(self) -> None: + # Initialize locks in the event loop + self.lrc_cache_locks = defaultdict(asyncio.Lock) + self.broadcast_locks = defaultdict(asyncio.Lock) stations = ", ".join(self.radio_util.db_queries.keys()) logging.info("radio: Initializing stations:\n%s", stations) await self.radio_util.load_playlists() @@ -424,16 +433,41 @@ class Radio(FastAPI): next["start"] = time_started next["end"] = time_ends - # Clear the LRC cache for the station - self.lrc_cache.pop(data.station, None) + # Use BackgroundTasks with a sync wrapper for LRC fetch/cache + def lrc_fetch_sync(station, track_json): + import asyncio + try: + async def lrc_fetch(): + async with self.lrc_cache_locks[station]: + self.lrc_cache.pop(station, None) + lrc, source = await self._fetch_and_cache_lrc(station, track_json) + if lrc: + self.lrc_cache[station] = lrc + else: + self.lrc_cache[station] = None + if lrc: + await self.broadcast_lrc(station, lrc, source) + asyncio.run(lrc_fetch()) + except Exception as e: + logging.error(f"[LRC] Error during LRC fetch/cache: {e}") + try: + # Pass a copy of the track dict to avoid mutation issues + background_tasks.add_task(lrc_fetch_sync, data.station, next.copy()) + except Exception as e: + logging.error(f"[LRC] Could not schedule LRC fetch task: {e}") try: - background_tasks.add_task(self.radio_util.webhook_song_change, next, data.station) - # Broadcast track change to WebSocket clients - background_tasks.add_task(self.broadcast_track_change, data.station, next.copy()) + background_tasks.add_task(self.radio_util.webhook_song_change, next, data.station) except Exception as e: logging.info("radio_get_next Exception: %s", str(e)) traceback.print_exc() + + try: + await self.broadcast_track_change(data.station, next.copy()) + except Exception as e: + logging.info("radio_get_next Exception: %s", str(e)) + traceback.print_exc() + try: album_art = self.radio_util.get_album_art(track_id=next["id"]) if not album_art: @@ -441,6 +475,7 @@ class Radio(FastAPI): except Exception as e: logging.info("radio_get_next Exception: %s", str(e)) traceback.print_exc() + return JSONResponse(content=next) async def radio_request( @@ -545,8 +580,16 @@ class Radio(FastAPI): current_track.pop("file_path", None) # Ensure file_path is stripped await websocket.send_text(json.dumps(current_track)) - # Send LRC asynchronously - asyncio.create_task(self._send_lrc_to_client(websocket, station, current_track)) + # Send cached LRC if available; do not attempt to fetch again for this client + async with self.lrc_cache_locks[station]: + cached_lrc = self.lrc_cache.get(station) + if cached_lrc: + lrc_data = { + "type": "lrc", + "data": cached_lrc, + "source": "Cache" + } + await websocket.send_text(json.dumps(lrc_data)) # Keep connection alive and handle incoming messages while True: @@ -600,129 +643,178 @@ class Radio(FastAPI): - **station** (str): Station name - **track_data** (dict): New track information """ + async with self.broadcast_locks[station]: + if station not in self.active_connections: + return + + # Take a snapshot of current clients for this update + current_clients = set(self.active_connections[station]) + if not current_clients: + return + + # Remove sensitive file path info before broadcasting + track_data_clean = track_data.copy() + track_data_clean.pop("file_path", None) + + # Create and send track change message first + broadcast_data = { + "type": "track_change", + "data": track_data_clean + } + + # Send track change to all clients + disconnected_clients = set() + for websocket in current_clients: + try: + await websocket.send_text(json.dumps(broadcast_data)) + except Exception as e: + logging.warning(f"[Track] Failed to send track change: {e}") + disconnected_clients.add(websocket) + + # Remove disconnected clients from our snapshot and active connections + current_clients -= disconnected_clients + for websocket in disconnected_clients: + self.active_connections[station].discard(websocket) + + if not current_clients: + logging.warning("[Track] No clients remaining after track broadcast") + return + + async def broadcast_lrc(self, station: str, lrc: str, source: str): + """Broadcast LRC data to all connected clients for a station.""" if station not in self.active_connections: return - - # Remove sensitive file path info - track_data.pop("file_path", None) - - # Create broadcast message - broadcast_data = { - "type": "track_change", - "data": track_data + current_clients = set(self.active_connections[station]) + if not current_clients: + return + lrc_data = { + "type": "lrc", + "data": lrc, + "source": source } - - # Send to all connected clients for this station disconnected_clients = set() - for websocket in self.active_connections[station]: + for websocket in current_clients: try: - await websocket.send_text(json.dumps(broadcast_data)) + await websocket.send_text(json.dumps(lrc_data)) except Exception as e: - logging.warning(f"Failed to send WebSocket message: {e}") + logging.warning(f"[LRC Broadcast] Failed to send to client: {e}") disconnected_clients.add(websocket) - - # Remove failed connections for websocket in disconnected_clients: self.active_connections[station].discard(websocket) - - # Broadcast LRC asynchronously - asyncio.create_task(self._broadcast_lrc(station, track_data)) async def _send_lrc_to_client(self, websocket: WebSocket, station: str, track_data: dict): - """Send LRC data to a specific client asynchronously.""" - logging.info(f"Sending LRC to client for station {station}") - logging.info(f"Track data: {track_data}") + """Send cached LRC data to a specific client asynchronously. Only sends if LRC exists in cache.""" + logging.info(f"[LRC Send] Checking cached LRC for station {station}") + logging.info(f"[LRC Send] Current track: {track_data.get('artist', 'Unknown')} - {track_data.get('song', 'Unknown')}") try: - # Always check if LRC is already cached + # Only send if LRC is in cache cached_lrc = self.lrc_cache.get(station) + logging.info(f"[LRC Send] Cache status for station {station}: {'Found' if cached_lrc else 'Not found'}") if cached_lrc: - logging.info("Using cached LRC for client") + logging.info("[LRC Send] Sending cached LRC to client") lrc_data: dict = { "type": "lrc", "data": cached_lrc, "source": "Cache" } await websocket.send_text(json.dumps(lrc_data)) - return + logging.info("[LRC Send] Successfully sent cached LRC to client") + else: + logging.info(f"[LRC Send] No cached LRC available for station {station}") + except Exception as e: + logging.error(f"[LRC Send] Failed to send cached LRC to client: {e}") + logging.error(f"[LRC Send] Error details: {traceback.format_exc()}") - # Fetch LRC if not cached + async def send_lrc_to_client(self, websocket: WebSocket, station: str, track_data: dict): + """Send cached LRC data to a specific client asynchronously. Only sends if valid LRC exists in cache.""" + try: + track_info = f"{track_data.get('artist', 'Unknown')} - {track_data.get('song', 'Unknown')}" + logging.info(f"[LRC Send {id(websocket)}] Starting LRC send for {track_info}") + logging.info(f"[LRC Send {id(websocket)}] Cache keys before lock: {list(self.lrc_cache.keys())}") + + # Get cached LRC with lock to ensure consistency + async with self.lrc_cache_locks[station]: + logging.info(f"[LRC Send {id(websocket)}] Got cache lock") + cached_lrc = self.lrc_cache.get(station) + logging.info(f"[LRC Send {id(websocket)}] Cache keys during lock: {list(self.lrc_cache.keys())}") + logging.info(f"[LRC Send {id(websocket)}] Cache entry length: {len(cached_lrc) if cached_lrc else 0}") + + # Only send if we have actual lyrics + if cached_lrc: + logging.info(f"[LRC Send {id(websocket)}] Preparing to send {len(cached_lrc)} bytes of LRC") + lrc_data: dict = { + "type": "lrc", + "data": cached_lrc, + "source": "Cache" + } + await websocket.send_text(json.dumps(lrc_data)) + logging.info(f"[LRC Send {id(websocket)}] Successfully sent LRC") + else: + logging.info(f"[LRC Send {id(websocket)}] No LRC in cache") + # If we have no cache entry, let's check if a fetch is needed + async with self.lrc_cache_locks[station]: + logging.info(f"[LRC Send {id(websocket)}] Checking if fetch needed") + # Only attempt fetch if we're the first to notice missing lyrics + if station not in self.lrc_cache: + logging.info(f"[LRC Send {id(websocket)}] Initiating LRC fetch") + lrc, source = await self._fetch_and_cache_lrc(station, track_data) + if lrc: + self.lrc_cache[station] = lrc + lrc_data: dict = { + "type": "lrc", + "data": lrc, + "source": source + } + await websocket.send_text(json.dumps(lrc_data)) + logging.info(f"[LRC Send {id(websocket)}] Sent newly fetched LRC") + except Exception as e: + logging.error(f"[LRC Send {id(websocket)}] Failed: {e}") + logging.error(f"[LRC Send {id(websocket)}] Error details: {traceback.format_exc()}") + + async def _fetch_and_cache_lrc(self, station: str, track_data: dict) -> tuple[Optional[str], str]: + """Fetch and cache LRC data for a station's current track.""" + try: artist: Optional[str] = track_data.get("artist") title: Optional[str] = track_data.get("song") duration: Optional[int] = track_data.get("duration") - - if artist and title: - logging.info(f"Fetching LRC for {artist} - {title} (duration: {duration})") - lrc: Optional[str] = await self.sr_util.get_lrc_by_artist_song( - artist, title, duration=duration - ) - source: str = "SR" - if not lrc: - logging.info(f"No LRC from SR, trying LRCLib for {artist} - {title}") + + if not (artist and title): + logging.info("[LRC] Missing artist or title, skipping fetch") + return None, "None" + + logging.info(f"[LRC] Starting fetch for {station}: {artist} - {title}") + + # Try SR first with timeout + try: + async with asyncio.timeout(5.0): # 5 second timeout + lrc = await self.sr_util.get_lrc_by_artist_song( + artist, title, duration=duration + ) + if lrc: + logging.info("[LRC] Found from SR") + return lrc, "SR" + except asyncio.TimeoutError: + logging.warning("[LRC] SR fetch timed out") + except Exception as e: + logging.error(f"[LRC] SR fetch error: {e}") + + logging.info("[LRC] SR fetch completed without results") + + # Try LRCLib as fallback with timeout + try: + async with asyncio.timeout(5.0): # 5 second timeout + logging.info("[LRC] Trying LRCLib fallback") lrclib_result = await self.lrclib.search(artist, title, plain=False) if lrclib_result and lrclib_result.lyrics and isinstance(lrclib_result.lyrics, str): - lrc = lrclib_result.lyrics - source = "LRCLib" - logging.info("LRC found via LRCLib fallback") - if lrc: - self.lrc_cache[station] = lrc # Cache the LRC regardless of source - lrc_data: dict = { - "type": "lrc", - "data": lrc, - "source": source - } - await websocket.send_text(json.dumps(lrc_data)) - logging.info("LRC sent to client") - else: - logging.info("No LRC found from any source.") + logging.info("[LRC] Found from LRCLib") + return lrclib_result.lyrics, "LRCLib" + except asyncio.TimeoutError: + logging.warning("[LRC] LRCLib fetch timed out") + except Exception as e: + logging.error(f"[LRC] LRCLib fetch error: {e}") + + logging.info("[LRC] No lyrics found from any source") + return None, "None" except Exception as e: - logging.error(f"Failed to send LRC to client: {e}") - - async def _broadcast_lrc(self, station: str, track_data: dict): - """Broadcast LRC data to all connected clients for a station asynchronously.""" - if station not in self.active_connections: - return - - try: - # Clear the LRC cache for the station on track change - self.lrc_cache.pop(station, None) - - # Fetch LRC if not cached - artist: Optional[str] = track_data.get("artist") - title: Optional[str] = track_data.get("song") # Changed from "title" to "song" - duration: Optional[int] = track_data.get("duration") - - if artist and title: - logging.info(f"Broadcasting LRC fetch for {artist} - {title} (duration: {duration})") - lrc: Optional[str] = await self.sr_util.get_lrc_by_artist_song( - artist, title, duration=duration - ) - source: str = "SR" - if not lrc: - logging.info(f"No LRC from SR, trying LRCLib for {artist} - {title}") - lrclib_result = await self.lrclib.search(artist, title, plain=False) - if lrclib_result and lrclib_result.lyrics and isinstance(lrclib_result.lyrics, str): - lrc = lrclib_result.lyrics - source = "LRCLib" - logging.info("LRC found via LRCLib fallback") - self.lrc_cache[station] = lrc # Cache the LRC - logging.info(f"LRC fetched for broadcast: {lrc is not None}") - if lrc: - lrc_data: dict = { - "type": "lrc", - "data": lrc, - "source": source - } - - # Send to all connected clients - disconnected_clients = set() - for websocket in self.active_connections[station]: - try: - await websocket.send_text(json.dumps(lrc_data)) - except Exception as e: - logging.warning(f"Failed to send LRC to client: {e}") - disconnected_clients.add(websocket) - for websocket in disconnected_clients: - self.active_connections[station].discard(websocket) - logging.info("LRC broadcasted to clients") - except Exception as e: - logging.error(f"Failed to broadcast LRC: {e}") + logging.error(f"[LRC] Error fetching lyrics: {e}") + return None, "None" diff --git a/lyric_search/sources/lrclib.py b/lyric_search/sources/lrclib.py index 581d432..694d00b 100644 --- a/lyric_search/sources/lrclib.py +++ b/lyric_search/sources/lrclib.py @@ -81,7 +81,7 @@ class LRCLib: # Filter by duration if provided if duration: - search_data = [r for r in search_data if abs(r.get("duration", 0) - duration) <= 5] + search_data = [r for r in search_data if abs(r.get("duration", 0) - duration) <= 10] if plain: possible_matches = [ diff --git a/test/liquidsoap.liq b/test/liquidsoap.liq new file mode 100644 index 0000000..eb96797 --- /dev/null +++ b/test/liquidsoap.liq @@ -0,0 +1,252 @@ +#!/usr/bin/env liquidsoap + +set("log.file.path","/home/kyle/.lsl.txt") +set("log.stdout",true) +set("harbor.bind_addrs", ["127.0.0.1"]) + +# Buffer and timing settings +set("frame.duration",0.02) +set("root.max_latency",2.) +set("audio.converter.samplerate.libsamplerate.quality","best") +set("clock.allow_streaming_errors",false) + + +# Get next track dynamically [Each station] + +def get_next_main() = + uri = list.hd(default="", process.read.lines("uv run get_next_track.py main")) + [request.create(uri)] +end + +def get_next_rock() = + uri = list.hd(default="", process.read.lines("uv run get_next_track.py rock")) + [request.create(uri)] +end + +def get_next_electronic() = + uri = list.hd(default="", process.read.lines("uv run get_next_track.py electronic")) + [request.create(uri)] +end + +def get_next_rap() = + uri = list.hd(default="", process.read.lines("uv run get_next_track.py rap")) + [request.create(uri)] +end + +#def get_next_classical() = +# uri = list.hd(default="", process.read.lines("uv run get_next_track.py classical")) +# [request.create(uri)] +#end + +def get_next_pop() = + uri = list.hd(default="", process.read.lines("uv run get_next_track.py pop")) + [request.create(uri)] +end + + +# Set up queues [Each station] + +main_list = request.dynamic( + id="requests", + get_next_main, + retry_delay=1.0, + timeout=20.0 +) + + +rock_list = request.dynamic( + id="rock_requests", + get_next_rock, + retry_delay=1.0, + timeout=20.0 +) + +electronic_list = request.dynamic( + id="electronic_requests", + get_next_electronic, + retry_delay=1.0, + timeout=20.0 +) + +rap_list = request.dynamic( + id="rap_requests", + get_next_rap, + retry_delay=1.0, + timeout=20.0 +) + +#classical_list = request.dynamic.list( +# id="classical_requests", +# get_next_classical, +# prefetch=0 +#) + +pop_list = request.dynamic( + id="pop_requests", + get_next_pop, + retry_delay=1.0, + timeout=20.0 +) + + +# Standard + +silence = single("/home/kyle/ls/silence.ogg") + +# Queue [Each station] + +def main_queue(remaining, _) = + log("MAIN: Queueing with #{remaining} seconds remaining") + if not main_list.fetch() then + log("Fetching next query failed") + end +end + +def rock_queue(remaining, _) = + log("ROCK: Queueing with #{remaining} seconds remaining") + if not rock_list.fetch() then + log("Fetching next query failed") + end +end + +def electronic_queue(remaining, _) = + log("ELECTRONIC: Queueing with #{remaining} seconds remaining") + if not electronic_list.fetch() then + log("Fetching next query failed") + end +end + +def rap_queue(remaining, _) = + log("RAP: Queueing with #{remaining} seconds remaining") + if not rap_list.fetch() then + log("Fetching next query failed") + end +end + +#def classical_queue(remaining, _) = +# log("CLASSICAL: Queueing with #{remaining} seconds remaining") +# if not classical_list.fetch() then +# log("Fetching next query failed") +# end +#end + +def pop_queue(remaining, _) = + log("POP: Queueing with #{remaining} seconds remaining") + if not pop_list.fetch() then + log("Fetching next query failed") + end +end + + + +# Initial fetch [Each station] + +main_list.fetch() +rock_list.fetch() +electronic_list.fetch() +rap_list.fetch() +#classical_list.fetch() +pop_list.fetch() + +# Source setup [Each station] + +def create_source(s,q) = + source.dynamic(s, track_sensitive=true, {q()}) +end + +main_source = create_source(main_list, main_queue) +rock_source = create_source(rock_list, rock_queue) +electronic_source = create_source(electronic_list, electronic_queue) +rap_source = create_source(rap_list, rap_queue) +#classical_source = create_source(classical_list, classical_queue) +pop_source = create_source(pop_list, pop_queue) + +all_tracks_main = fallback(track_sensitive=false, [main_source, silence]) +all_tracks_rock = fallback(track_sensitive=false, [rock_source, silence]) +all_tracks_electronic = fallback(track_sensitive=false, [electronic_source, silence]) +all_tracks_rap = fallback(track_sensitive=false, [rap_source, silence]) +#all_tracks_classical = fallback(track_sensitive=false, [classical_source, silence]) +all_tracks_pop = fallback(track_sensitive=false, [pop_source, silence]) + +# HLS Setup [Standard] + +aac_lofi = %ffmpeg(format="mpegts", + %audio(codec="aac", + channels=2, + ar=48000, + b="128k")) + +aac_midfi = %ffmpeg(format="mpegts", + %audio(codec="aac", + channels=2, + ar=48000, + b="256k")) + +aac_hifi = %ffmpeg(format="mpegts", + %audio(codec="aac", + channels=2, + ar=48000, + b="512k")) + + +streams = + [("aac_lofi", aac_lofi), ("aac_midfi", aac_midfi), ("aac_hifi", aac_hifi)] + + +# HLS Outputs [Each station] + +def create_hls_output(~name, source) = + output.file.hls( + playlist="#{name}.m3u8", + segment_duration=0.5, + segments=10, + segments_overhead=5, + persist_at="/nvme/pub/hls/#{name}/state.config", + "/nvme/pub/hls/#{name}", + streams, + source + ) +end + +create_hls_output(name="main", mksafe(main_source)) + +create_hls_output(name="rock", mksafe(rock_source)) +create_hls_output(name="electronic", mksafe(electronic_source)) +create_hls_output(name="rap", mksafe(rap_source)) + +#output.file.hls( +# playlist="classical.m3u8", +# segment_duration=0.45, +# segments=9, +# segments_overhead=3, +# persist_at="/nvme/pub/hls/classical_state.config", +# "/nvme/pub/hls/classical", +# streams, +# mksafe(classical_source) +#) + +create_hls_output(name="pop", mksafe(pop_source)) + +# HTTP Server + +def get_next_http(~protocol,~data,~headers,uri) = + source = + if data == "main" then main_source + elsif data == "rock" then rock_source + elsif data == "electronic" then electronic_source + elsif data == "rap" then rap_source + elsif data == "pop" then pop_source + else null() end + + if source != null() then + source.skip(source) + http.response( + protocol=protocol, + code=200, + data="OK #{data}" + ) +end + +harbor.http.register(port=29000, method="POST", "/next", get_next_http) + +# EOF diff --git a/test/liquidsoap.ls b/test/liquidsoap.ls new file mode 100644 index 0000000..b6ffbb6 --- /dev/null +++ b/test/liquidsoap.ls @@ -0,0 +1,270 @@ +#!/usr/bin/liquidsoap +set("log.file.path", "/home/kyle/.lsl.txt") +set("log.stdout", true) +set("harbor.bind_addrs", ["127.0.0.1"]) + + +# Get next track dynamically [Each station] + +def get_next_main() = + uri = list.hd(default="", process.read.lines("uv run get_next_track.py main")) + [request.create(uri)] +end + +def get_next_rock() = + uri = list.hd(default="", process.read.lines("uv run get_next_track.py rock")) + [request.create(uri)] +end + +def get_next_electronic() = + uri = list.hd(default="", process.read.lines("uv run get_next_track.py electronic")) + [request.create(uri)] +end + +def get_next_rap() = + uri = list.hd(default="", process.read.lines("uv run get_next_track.py rap")) + [request.create(uri)] +end + +#def get_next_classical() = +# uri = list.hd(default="", process.read.lines("uv run get_next_track.py classical")) +# [request.create(uri)] +#end + +def get_next_pop() = + uri = list.hd(default="", process.read.lines("uv run get_next_track.py pop")) + [request.create(uri)] +end + + +# Set up queues [Each station] + +main_list = request.dynamic.list( + id="requests", + get_next_main, + prefetch=0 +) + + +rock_list = request.dynamic.list( + id="rock_requests", + get_next_rock, + prefetch=0 +) + +electronic_list = request.dynamic.list( + id="electronic_requests", + get_next_electronic, + prefetch=0 +) + +rap_list = request.dynamic.list( + id="rap_requests", + get_next_rap, + prefetch=0 +) + +#classical_list = request.dynamic.list( +# id="classical_requests", +# get_next_classical, +# prefetch=0 +#) + +pop_list = request.dynamic.list( + id="pop_requests", + get_next_pop, + prefetch=0 +) + + +# Standard + +silence = single("/home/kyle/ls/silence.ogg") + +# Queue [Each station] + +def main_queue(remaining, _) = + log("MAIN: Queueing with #{remaining} seconds remaining") + if not main_list.fetch() then + log("Fetching next query failed") + end +end + +def rock_queue(remaining, _) = + log("ROCK: Queueing with #{remaining} seconds remaining") + if not rock_list.fetch() then + log("Fetching next query failed") + end +end + +def electronic_queue(remaining, _) = + log("ELECTRONIC: Queueing with #{remaining} seconds remaining") + if not electronic_list.fetch() then + log("Fetching next query failed") + end +end + +def rap_queue(remaining, _) = + log("RAP: Queueing with #{remaining} seconds remaining") + if not rap_list.fetch() then + log("Fetching next query failed") + end +end + +#def classical_queue(remaining, _) = +# log("CLASSICAL: Queueing with #{remaining} seconds remaining") +# if not classical_list.fetch() then +# log("Fetching next query failed") +# end +#end + +def pop_queue(remaining, _) = + log("POP: Queueing with #{remaining} seconds remaining") + if not pop_list.fetch() then + log("Fetching next query failed") + end +end + + + +# Initial fetch [Each station] + +main_list.fetch() +rock_list.fetch() +electronic_list.fetch() +rap_list.fetch() +#classical_list.fetch() +pop_list.fetch() + +# Source setup [Each station] + +main_source = source.on_end(delay=1.0, main_list, main_queue) +rock_source = source.on_end(delay=1.0, rock_list, rock_queue) +electronic_source = source.on_end(delay=1.0, electronic_list, electronic_queue) +rap_source = source.on_end(delay=1.0, rap_list, rap_queue) +#classical_source = source.on_end(delay=1.0, classical_list, classical_queue) +pop_source = source.on_end(delay=1.0, pop_list, pop_queue) + +all_tracks_main = fallback(track_sensitive=false, [main_source, silence]) +all_tracks_rock = fallback(track_sensitive=false, [rock_source, silence]) +all_tracks_electronic = fallback(track_sensitive=false, [electronic_source, silence]) +all_tracks_rap = fallback(track_sensitive=false, [rap_source, silence]) +#all_tracks_classical = fallback(track_sensitive=false, [classical_source, silence]) +all_tracks_pop = fallback(track_sensitive=false, [pop_source, silence]) + +# HLS Setup [Standard] + +aac_lofi = + %ffmpeg(format = "mpegts", %audio(codec = "aac", channels = 2, ar = 44100)) + +aac_midfi = + %ffmpeg( + format = "mpegts", + %audio(codec = "aac", channels = 2, ar = 44100, b = "96k") + ) + +aac_hifi = + %ffmpeg( + format = "mpegts", + %audio(codec = "aac", channels = 2, ar = 44100, b = "448k") + ) + + +streams = + [("aac_lofi", aac_lofi), ("aac_midfi", aac_midfi), ("aac_hifi", aac_hifi)] + + +# HLS Outputs [Each station] + +output.file.hls( + playlist="main.m3u8", + segment_duration=0.5, + segments=9, + segments_overhead=4, + persist_at="/nvme/pub/hls/state.config", + "/nvme/pub/hls/main", + streams, + mksafe(main_source) +) + +output.file.hls( + playlist="rock.m3u8", + segment_duration=0.5, + segments=9, + segments_overhead=4, + persist_at="/nvme/pub/hls/rock/state.config", + "/nvme/pub/hls/rock", + streams, + mksafe(rock_source) +) + +output.file.hls( + playlist="electronic.m3u8", + segment_duration=0.5, + segments=9, + segments_overhead=4, + persist_at="/nvme/pub/hls/electronic/state.config", + "/nvme/pub/hls/electronic", + streams, + mksafe(electronic_source) +) + +output.file.hls( + playlist="rap.m3u8", + segment_duration=0.5, + segments=9, + segments_overhead=4, + persist_at="/nvme/pub/hls/rap_state.config", + "/nvme/pub/hls/rap", + streams, + mksafe(rap_source) +) + +#output.file.hls( +# playlist="classical.m3u8", +# segment_duration=0.45, +# segments=9, +# segments_overhead=3, +# persist_at="/nvme/pub/hls/classical_state.config", +# "/nvme/pub/hls/classical", +# streams, +# mksafe(classical_source) +#) + +output.file.hls( + playlist="pop.m3u8", + segment_duration=0.5, + segments=9, + segments_overhead=4, + persist_at="/nvme/pub/hls/pop_state.config", + "/nvme/pub/hls/pop", + streams, + mksafe(pop_source) +) + +# HTTP Server + +def get_next_http(~protocol,~data,~headers,uri) = + if data == "main" then + _req = source.skip(main_source) + elsif data == "rock" then + _req = source.skip(rock_source) + elsif data == "electronic" then + _req = source.skip(electronic_source) + elsif data == "rap" then + _req = source.skip(rap_source) +#elsif data == "classical" then +# _req = source.skip(classical_source) + elsif data == "pop" then + _req = source.skip(pop_source) + end + http.response( + protocol=protocol, + code=200, + data="OK #{data}" + ) +end + +harbor.http.register(port=29000, method="POST", "/next", get_next_http) + +# EOF diff --git a/test/minimal_test.liq b/test/minimal_test.liq new file mode 100644 index 0000000..e69de29 diff --git a/utils/sr_wrapper.py b/utils/sr_wrapper.py index 368781c..dfc4107 100644 --- a/utils/sr_wrapper.py +++ b/utils/sr_wrapper.py @@ -786,7 +786,7 @@ class SRUtil: best_track, min_diff = tracks_with_diff[0] logging.info(f"SR: Best match duration diff: {min_diff}s") # If the closest match is more than 5 seconds off, consider no match - if min_diff > 5: + if min_diff > 10: logging.info("SR: Duration diff too large, no match") return None else: