Compare commits

..

14 Commits

Author SHA1 Message Date
6240888ac5 cull transcriptions endpoints from README.md 2025-12-03 13:56:28 -05:00
be9ed777a5 retire transcriptions endpoints 2025-12-03 13:55:52 -05:00
7779049c93 misc / sr_wrapper-- only consider an album returned from tidal to be a duplicate if both the name AND release date match. 2025-11-26 15:09:43 -05:00
41d9065c79 rm 2025-11-25 13:08:20 -05:00
85298b861d minor 2025-11-25 13:06:07 -05:00
476c4e6e51 bugfix: LRCLIb's models.py needs load_dotenv 2025-11-23 07:12:38 -05:00
353f14c899 formatting / minor 2025-11-22 21:43:48 -05:00
3d0b867427 Misc 2025-11-22 21:41:12 -05:00
dcc6c7b24e More progress re: #34
- Change of direction, LRCLib searches from /lyric/search now use internal cache - which is a PGSQL import of the LRCLib SQLite database.  Change to PGSQL was made for performance.
2025-11-22 13:13:03 -05:00
c302b256d3 Begin #34 2025-11-21 12:29:12 -05:00
c6d2bad79d Enhance lyric search functionality by improving line splitting logic and adding multi-line matching for subsearch. Update cache handling to ensure confidence threshold is respected before returning results. 2025-10-24 13:40:55 -04:00
25d1ab226e Enhance Cync authentication flow with improved token management and 2FA handling. Add periodic token validation and logging for better debugging. Introduce FLAC stream check in bulk download process. 2025-10-22 06:55:37 -04:00
3f66223328 Fix import statement for index_definition in redis_cache.py and radio_util.py (dependency upgrade related-- redis module) 2025-10-15 12:13:03 -04:00
c493f2aabf Increase rate limit for lighting state requests and enhance error handling for Cync device operations. Improve lyric search processing by splitting lyrics based on line breaks and cleaning special characters (bugfix for subsearch/seek). 2025-10-15 10:10:56 -04:00
17 changed files with 1069 additions and 601 deletions

0
.github/copilot-instructions.md vendored Normal file
View File

View File

@@ -51,10 +51,6 @@ This server is built with [FastAPI](https://fastapi.tiangolo.com/) and provides
**YouTube (`/yt/`)** 🎥 **YouTube (`/yt/`)** 🎥
- `POST /yt/search` - Search for YouTube videos by title - `POST /yt/search` - Search for YouTube videos by title
**Transcriptions (`/transcriptions/`)** 📝
- `POST /transcriptions/get_episodes` - Get episode list by show ID
- `POST /transcriptions/get_episode_lines` - Get transcript lines for specific episodes
**Lyric Search (`/lyric/`)** 🎵 **Lyric Search (`/lyric/`)** 🎵
- `POST /lyric/search` - Search for song lyrics across multiple sources - `POST /lyric/search` - Search for song lyrics across multiple sources
- Supports artist/song search, text search within lyrics - Supports artist/song search, text search within lyrics

View File

@@ -36,6 +36,7 @@ origins = [
"https://status.boatson.boats", "https://status.boatson.boats",
"https://_new.codey.lol", "https://_new.codey.lol",
"http://localhost:4321", "http://localhost:4321",
"https://local.codey.lol:4321",
] ]
app.add_middleware( app.add_middleware(
@@ -102,9 +103,6 @@ routes: dict = {
"randmsg": importlib.import_module("endpoints.rand_msg").RandMsg( "randmsg": importlib.import_module("endpoints.rand_msg").RandMsg(
app, util, constants app, util, constants
), ),
"transcriptions": importlib.import_module(
"endpoints.transcriptions"
).Transcriptions(app, util, constants),
"lyrics": importlib.import_module("endpoints.lyric_search").LyricSearch( "lyrics": importlib.import_module("endpoints.lyric_search").LyricSearch(
app, util, constants app, util, constants
), ),

View File

@@ -3,6 +3,9 @@ import json
import os import os
import time import time
import aiohttp import aiohttp
import asyncio
import traceback
from datetime import datetime
from fastapi import FastAPI, Depends, HTTPException, Request from fastapi import FastAPI, Depends, HTTPException, Request
from fastapi_throttle import RateLimiter from fastapi_throttle import RateLimiter
from fastapi.responses import JSONResponse from fastapi.responses import JSONResponse
@@ -14,10 +17,75 @@ from pycync.user import User # type: ignore
from pycync.cync import Cync as Cync # type: ignore from pycync.cync import Cync as Cync # type: ignore
from pycync import Auth # type: ignore from pycync import Auth # type: ignore
from pycync.exceptions import TwoFactorRequiredError, AuthFailedError # type: ignore from pycync.exceptions import TwoFactorRequiredError, AuthFailedError # type: ignore
import inspect
import getpass
from typing import Optional
# Configure logging to write to a file for specific events
logging.basicConfig(
filename="cync_auth_events.log",
level=logging.INFO,
format="%(asctime)s - %(levelname)s - %(message)s",
)
def _mask_token(token: Optional[str]) -> str:
"""Mask sensitive token data for logging, showing only first/last 4 chars."""
if not token or len(token) < 8:
return "<invalid_token>"
return f"{token[:4]}...{token[-4:]}"
def _log_token_state(user, context: str):
"""Log masked token state for debugging."""
if not user:
logging.info(f"{context} - No user object available")
return
try:
logging.info(
f"{context} - Token state: access=%s refresh=%s expires_at=%s",
_mask_token(getattr(user, "access_token", None)),
_mask_token(getattr(user, "refresh_token", None)),
getattr(user, "expires_at", None),
)
except Exception as e:
logging.error(f"Error logging token state: {e}")
class Lighting(FastAPI): class Lighting(FastAPI):
async def ensure_cync_connection(self): async def _close_session_safely(self):
"""Safely close the current session if it exists."""
if self.session and not getattr(self.session, "closed", True):
try:
await self.session.close()
# Wait a bit for the underlying connections to close
await asyncio.sleep(0.1)
except Exception as e:
logging.warning(f"Error closing session: {e}")
self.session = None
self.auth = None
self.cync_api = None
async def _test_connection_health(self) -> bool:
"""Test if the current connection is healthy by making a simple API call."""
if (
not self.cync_api
or not self.session
or getattr(self.session, "closed", True)
):
return False
try:
# Make a simple API call to test connectivity
devices = self.cync_api.get_devices()
# Just check if we get a response without errors
return devices is not None
except Exception as e:
logging.warning(f"Connection health check failed: {e}")
return False
async def ensure_cync_connection(self, force_reconnect: bool = False):
"""Ensure aiohttp session and Cync API are alive, re-create if needed.""" """Ensure aiohttp session and Cync API are alive, re-create if needed."""
# Check required environment variables # Check required environment variables
missing_vars = [] missing_vars = []
@@ -31,58 +99,168 @@ class Lighting(FastAPI):
raise Exception( raise Exception(
f"Missing required environment variables: {', '.join(missing_vars)}" f"Missing required environment variables: {', '.join(missing_vars)}"
) )
# Cast to str after check to silence linter # Cast to str after check to silence linter
cync_email: str = self.cync_email # type: ignore cync_email: str = self.cync_email # type: ignore
cync_password: str = self.cync_password # type: ignore cync_password: str = self.cync_password # type: ignore
# Check if session is closed or missing # If force_reconnect is True or connection is unhealthy, rebuild everything
if not self.session or getattr(self.session, "closed", False): if force_reconnect or not await self._test_connection_health():
self.session = aiohttp.ClientSession() logging.info(
"Connection unhealthy or force reconnect requested. Rebuilding connection..."
)
# Clean up existing connection
await self._close_session_safely()
# Create new session with timeout configuration
timeout = aiohttp.ClientTimeout(total=30, connect=10)
connector = aiohttp.TCPConnector(
limit=100,
limit_per_host=30,
ttl_dns_cache=300,
use_dns_cache=True,
keepalive_timeout=60,
enable_cleanup_closed=True,
)
self.session = aiohttp.ClientSession(timeout=timeout, connector=connector)
# Load cached token and check validity
self.cync_user = None
cached_user = self._load_cached_user() cached_user = self._load_cached_user()
if cached_user: token_status = None
if cached_user and hasattr(cached_user, "expires_at"):
# Add buffer time - consider token expired if less than 5 minutes remaining
buffer_time = 300 # 5 minutes
if cached_user.expires_at > (time.time() + buffer_time):
token_status = "valid"
else:
token_status = "expired"
else:
token_status = "no cached user or missing expires_at"
logging.info(f"Cync token status: {token_status}")
if token_status == "valid" and cached_user is not None:
# Use cached token
self.auth = Auth( self.auth = Auth(
session=self.session, session=self.session,
user=cached_user, user=cached_user,
username=cync_email, username=cync_email,
password=cync_password, password=cync_password,
) )
self.cync_user = cached_user
logging.info("Reusing valid cached token, no 2FA required.")
else: else:
self.auth = Auth( # Need fresh login - clear any cached user that's expired
session=self.session, username=cync_email, password=cync_password if token_status == "expired":
)
# Try to refresh token
self.cync_user = None
if (
self.auth.user
and hasattr(self.auth.user, "expires_at")
and self.auth.user.expires_at > time.time()
):
try: try:
await self.auth.async_refresh_user_token() os.remove(self.token_cache_path)
self.cync_user = self.auth.user logging.info("Removed expired token cache")
self._save_cached_user(self.cync_user) except (OSError, FileNotFoundError):
except AuthFailedError:
pass pass
# If no valid token, login
if not self.cync_user: logging.info("Initializing new Auth instance...")
try: self.auth = Auth(
self.cync_user = await self.auth.login() session=self.session,
self._save_cached_user(self.cync_user) username=cync_email,
except TwoFactorRequiredError: password=cync_password,
logging.error(
"Cync 2FA required. Set CYNC_2FA_CODE in env if needed."
) )
try:
logging.info("Attempting fresh login...")
self.cync_user = await self.auth.login()
_log_token_state(self.cync_user, "After fresh login")
self._save_cached_user(self.cync_user)
logging.info("Fresh login successful")
except TwoFactorRequiredError:
twofa_code = os.getenv("CYNC_2FA_CODE")
if not twofa_code:
print("Cync 2FA required. Please enter your code:")
twofa_code = getpass.getpass("2FA Code: ")
if twofa_code:
logging.info("Retrying Cync login with 2FA code.")
try:
self.cync_user = await self.auth.login(
two_factor_code=twofa_code
)
self._save_cached_user(self.cync_user)
logging.info("Logged in with 2FA successfully.")
except Exception as e:
logging.error("Cync 2FA login failed: %s", e)
logging.info(
"2FA failure details: Code=%s, User=%s",
twofa_code,
self.cync_user,
)
raise Exception("Cync 2FA code invalid or not accepted.")
else:
logging.error("Cync 2FA required but no code provided.")
raise Exception("Cync 2FA required.") raise Exception("Cync 2FA required.")
except AuthFailedError as e: except AuthFailedError as e:
logging.error("Failed to authenticate with Cync API: %s", e) logging.error("Failed to authenticate with Cync API: %s", e)
raise Exception("Cync authentication failed.") raise Exception("Cync authentication failed.")
# Create new Cync API instance
try:
logging.info("Creating Cync API instance...")
_log_token_state(self.auth.user, "Before Cync.create")
self.cync_api = await Cync.create(self.auth) self.cync_api = await Cync.create(self.auth)
# Also check if cync_api is None (shouldn't happen, but just in case) logging.info("Cync API connection established successfully")
if not self.cync_api: except Exception as e:
if not self.auth: logging.error("Failed to create Cync API instance")
logging.critical("self.auth: %s", self.auth) logging.error("Exception details: %s", str(e))
return logging.error("Traceback:\n%s", traceback.format_exc())
self.cync_api = await Cync.create(self.auth)
# Save diagnostic info
diagnostic_data = {
"timestamp": datetime.now().isoformat(),
"error_type": type(e).__name__,
"error_message": str(e),
"auth_state": {
"has_auth": bool(self.auth),
"has_user": bool(getattr(self.auth, "user", None)),
"user_state": {
"access_token": _mask_token(
getattr(self.auth.user, "access_token", None)
)
if self.auth and self.auth.user
else None,
"refresh_token": _mask_token(
getattr(self.auth.user, "refresh_token", None)
)
if self.auth and self.auth.user
else None,
"expires_at": getattr(self.auth.user, "expires_at", None)
if self.auth and self.auth.user
else None,
}
if self.auth and self.auth.user
else None,
},
}
diagnostic_file = f"cync_api_failure-{int(time.time())}.json"
try:
with open(diagnostic_file, "w") as f:
json.dump(diagnostic_data, f, indent=2)
logging.info(
f"Saved API creation diagnostic data to {diagnostic_file}"
)
except Exception as save_error:
logging.error(f"Failed to save diagnostic data: {save_error}")
raise
# Final validation
if (
not self.cync_api
or not self.session
or getattr(self.session, "closed", True)
):
logging.error("Connection validation failed after setup")
_log_token_state(
getattr(self.auth, "user", None), "Failed connection validation"
)
raise Exception("Failed to establish proper Cync connection")
""" """
Lighting Endpoints Lighting Endpoints
@@ -108,6 +286,7 @@ class Lighting(FastAPI):
self.auth = None self.auth = None
self.cync_user = None self.cync_user = None
self.cync_api = None self.cync_api = None
self.health_check_task: Optional[asyncio.Task] = None
# Set up Cync connection at startup using FastAPI event # Set up Cync connection at startup using FastAPI event
@app.on_event("startup") @app.on_event("startup")
@@ -125,76 +304,193 @@ class Lighting(FastAPI):
f"Missing required environment variables: {', '.join(missing_vars)}" f"Missing required environment variables: {', '.join(missing_vars)}"
) )
self.session = aiohttp.ClientSession() # Use ensure_cync_connection which has proper token caching
cached_user = self._load_cached_user()
if cached_user:
self.auth = Auth(
session=self.session,
user=cached_user,
username=self.cync_email or "",
password=self.cync_password or "",
)
else:
self.auth = Auth(
session=self.session,
username=self.cync_email or "",
password=self.cync_password or "",
)
# Try to refresh token
if (
self.auth.user
and hasattr(self.auth.user, "expires_at")
and self.auth.user.expires_at > time.time()
):
try: try:
await self.auth.async_refresh_user_token() await self.ensure_cync_connection()
self.cync_user = self.auth.user logging.info("Cync lighting system initialized successfully")
self._save_cached_user(self.cync_user) except Exception as e:
except AuthFailedError: logging.error(f"Failed to initialize Cync connection at startup: {e}")
pass # Don't raise - allow server to start, connection will be retried on first request
# If no valid token, login
if not self.cync_user:
try:
self.cync_user = await self.auth.login()
self._save_cached_user(self.cync_user)
except TwoFactorRequiredError:
logging.error(
"Cync 2FA required. Set CYNC_2FA_CODE in env if needed."
)
raise Exception("Cync 2FA required.")
except AuthFailedError as e:
logging.error("Failed to authenticate with Cync API: %s", e)
raise Exception("Cync authentication failed.")
# Create persistent Cync API object
self.cync_api = await Cync.create(self.auth)
# Schedule periodic token validation and connection health checks
self.health_check_task = asyncio.create_task(self._schedule_health_checks())
@app.on_event("shutdown")
async def shutdown_event():
# Cancel health check task
if self.health_check_task and not self.health_check_task.done():
self.health_check_task.cancel()
try:
await self.health_check_task
except asyncio.CancelledError:
logging.info("Health check task cancelled successfully")
pass
# Clean up connections
await self._close_session_safely()
logging.info("Cync lighting system shut down cleanly")
# Register endpoints
self.endpoints: dict = { self.endpoints: dict = {
"lighting/state": self.get_lighting_state, "lighting/state": self.get_lighting_state,
} }
for endpoint, handler in self.endpoints.items(): for endpoint, handler in self.endpoints.items():
app.add_api_route( self.app.add_api_route(
f"/{endpoint}", f"/{endpoint}",
handler, handler,
methods=["GET"], methods=["GET"],
include_in_schema=True, include_in_schema=True,
dependencies=[ dependencies=[
Depends(RateLimiter(times=10, seconds=2)), Depends(RateLimiter(times=25, seconds=2)),
Depends(get_current_user), Depends(get_current_user),
], ],
) )
app.add_api_route( self.app.add_api_route(
"/lighting/state", "/lighting/state",
self.set_lighting_state, self.set_lighting_state,
methods=["POST"], methods=["POST"],
include_in_schema=True, include_in_schema=True,
dependencies=[ dependencies=[
Depends(RateLimiter(times=10, seconds=2)), Depends(RateLimiter(times=25, seconds=2)),
Depends(get_current_user), Depends(get_current_user),
], ],
) )
async def _refresh_or_login(self):
if not self.auth:
logging.error("Auth object is not initialized.")
raise Exception("Cync authentication not initialized.")
try:
user = getattr(self.auth, "user", None)
_log_token_state(user, "Before refresh attempt")
if user and hasattr(user, "expires_at") and user.expires_at > time.time():
refresh = getattr(self.auth, "async_refresh_user_token", None)
if callable(refresh):
try:
logging.info("Attempting token refresh...")
result = refresh()
if inspect.isawaitable(result):
await result
logging.info(
"Token refresh completed successfully (awaited)"
)
else:
logging.info("Token refresh completed (non-awaitable)")
except AuthFailedError as e:
logging.error("Token refresh failed with AuthFailedError")
logging.error("Exception details: %s", str(e))
logging.error("Traceback:\n%s", traceback.format_exc())
# Save diagnostic info to file
diagnostic_data = {
"timestamp": datetime.now().isoformat(),
"error_type": "AuthFailedError",
"error_message": str(e),
"user_state": {
"access_token": _mask_token(
getattr(user, "access_token", None)
),
"refresh_token": _mask_token(
getattr(user, "refresh_token", None)
),
"expires_at": getattr(user, "expires_at", None),
},
}
try:
diagnostic_file = (
f"cync_auth_failure-{int(time.time())}.json"
)
with open(diagnostic_file, "w") as f:
json.dump(diagnostic_data, f, indent=2)
logging.info(f"Saved diagnostic data to {diagnostic_file}")
except Exception as save_error:
logging.error(
f"Failed to save diagnostic data: {save_error}"
)
raise
login = getattr(self.auth, "login", None)
if callable(login):
try:
result = login()
if inspect.isawaitable(result):
self.cync_user = await result
else:
self.cync_user = result
self._save_cached_user(self.cync_user)
logging.info("Logged in successfully.")
except TwoFactorRequiredError:
twofa_code = os.getenv("CYNC_2FA_CODE")
if not twofa_code:
# Prompt interactively if not set
print("Cync 2FA required. Please enter your code:")
twofa_code = getpass.getpass("2FA Code: ")
if twofa_code:
logging.info("Retrying Cync login with 2FA code.")
try:
result = login(two_factor_code=twofa_code)
if inspect.isawaitable(result):
self.cync_user = await result
else:
self.cync_user = result
self._save_cached_user(self.cync_user)
logging.info("Logged in with 2FA successfully.")
except Exception as e:
logging.error("Cync 2FA login failed: %s", e)
logging.info(
"2FA failure details: Code=%s, User=%s",
twofa_code,
self.cync_user,
)
raise Exception("Cync 2FA code invalid or not accepted.")
else:
logging.error("Cync 2FA required but no code provided.")
raise Exception("Cync 2FA required.")
else:
raise Exception("Auth object missing login method.")
except AuthFailedError as e:
logging.error("Failed to authenticate with Cync API: %s", e)
raise Exception("Cync authentication failed.")
except Exception as e:
logging.error("Unexpected error during authentication: %s", e)
raise
async def _schedule_health_checks(self):
"""Periodic health checks and token validation."""
while True:
try:
await asyncio.sleep(300) # Check every 5 minutes
# Check token expiration (refresh if less than 10 minutes left)
if self.cync_user and hasattr(self.cync_user, "expires_at"):
expires_at = getattr(self.cync_user, "expires_at", 0)
time_until_expiry = expires_at - time.time()
if time_until_expiry < 600: # Less than 10 minutes
logging.info(
f"Token expires in {int(time_until_expiry / 60)} minutes. Refreshing..."
)
try:
await self._refresh_or_login()
except Exception as e:
logging.error(
f"Token refresh failed during health check: {e}"
)
# Test connection health
if not await self._test_connection_health():
logging.warning(
"Connection health check failed. Will reconnect on next API call."
)
except asyncio.CancelledError:
logging.info("Health check task cancelled")
break
except Exception as e:
logging.error(f"Error during periodic health check: {e}")
# Continue the loop even on errors
def _load_cached_user(self): def _load_cached_user(self):
try: try:
if os.path.exists(self.token_cache_path): if os.path.exists(self.token_cache_path):
@@ -253,8 +549,10 @@ class Lighting(FastAPI):
""" """
Set the lighting state and apply it to the Cync device. Set the lighting state and apply it to the Cync device.
""" """
logging.info("=== LIGHTING STATE REQUEST RECEIVED ===")
try: try:
state = await request.json() state = await request.json()
logging.info(f"Requested state: {state}")
# Validate state (basic validation) # Validate state (basic validation)
if not isinstance(state, dict): if not isinstance(state, dict):
raise HTTPException( raise HTTPException(
@@ -266,7 +564,7 @@ class Lighting(FastAPI):
await self.ensure_cync_connection() await self.ensure_cync_connection()
# Apply to Cync device # Validate and extract state values
power = state.get("power", "off") power = state.get("power", "off")
if power not in ["on", "off"]: if power not in ["on", "off"]:
raise HTTPException( raise HTTPException(
@@ -296,14 +594,31 @@ class Lighting(FastAPI):
else: else:
rgb = None rgb = None
# Use persistent Cync API object # Apply to Cync device with robust retry and error handling
max_retries = 3
last_exception: Exception = Exception("No attempts made")
for attempt in range(max_retries):
try:
# Ensure connection before each attempt
force_reconnect = attempt > 0 # Force reconnect on retries
await self.ensure_cync_connection(force_reconnect=force_reconnect)
if not self.cync_api: if not self.cync_api:
raise HTTPException(status_code=500, detail="Cync API not initialized.") raise Exception("Cync API not available after connection setup")
devices = self.cync_api.get_devices()
if not devices or not isinstance(devices, (list, tuple)): logging.info(
raise HTTPException( f"Attempt {attempt + 1}/{max_retries}: Getting devices from Cync API..."
status_code=500, detail="No devices returned from Cync API."
) )
devices = self.cync_api.get_devices()
if not devices:
raise Exception("No devices returned from Cync API")
logging.info(
f"Devices returned: {[getattr(d, 'name', 'unnamed') for d in devices]}"
)
light = next( light = next(
( (
d d
@@ -312,25 +627,101 @@ class Lighting(FastAPI):
), ),
None, None,
) )
if not light: if not light:
raise HTTPException( available_devices = [
status_code=404, getattr(d, "name", "unnamed") for d in devices
detail=f"Device '{self.cync_device_name}' not found", ]
raise Exception(
f"Device '{self.cync_device_name}' not found. Available devices: {available_devices}"
) )
logging.info(
f"Selected device: {getattr(light, 'name', 'unnamed')}"
)
# Execute device operations
operations_completed = []
# Set power # Set power
if power == "on": if power == "on":
await light.turn_on() result = await light.turn_on()
operations_completed.append(f"turn_on: {result}")
else: else:
await light.turn_off() result = await light.turn_off()
operations_completed.append(f"turn_off: {result}")
# Set brightness # Set brightness
if "brightness" in state: if "brightness" in state:
await light.set_brightness(brightness) result = await light.set_brightness(brightness)
operations_completed.append(
f"set_brightness({brightness}): {result}"
)
# Set color # Set color
if rgb: if rgb:
await light.set_rgb(rgb) result = await light.set_rgb(rgb)
operations_completed.append(f"set_rgb({rgb}): {result}")
logging.info(
f"All operations completed successfully: {operations_completed}"
)
break # Success, exit retry loop
except (
aiohttp.ClientConnectionError,
aiohttp.ClientOSError,
aiohttp.ServerDisconnectedError,
aiohttp.ClientConnectorError,
ConnectionResetError,
ConnectionError,
OSError,
asyncio.TimeoutError,
) as e:
last_exception = e
logging.warning(
f"Connection/network error (attempt {attempt + 1}/{max_retries}): {type(e).__name__}: {e}"
)
if attempt < max_retries - 1:
# Wait a bit before retry to allow network/server recovery
await asyncio.sleep(
2**attempt
) # Exponential backoff: 1s, 2s, 4s
continue
except (AuthFailedError, TwoFactorRequiredError) as e:
last_exception = e
logging.error(
f"Authentication error (attempt {attempt + 1}/{max_retries}): {e}"
)
if attempt < max_retries - 1:
# Clear cached tokens on auth errors
try:
os.remove(self.token_cache_path)
logging.info("Cleared token cache due to auth error")
except (OSError, FileNotFoundError):
pass
await asyncio.sleep(1)
continue
except Exception as e:
last_exception = e
error_msg = f"Unexpected error (attempt {attempt + 1}/{max_retries}): {type(e).__name__}: {e}"
logging.error(error_msg)
# On unexpected errors, try reconnecting for next attempt
if attempt < max_retries - 1:
logging.warning(
"Forcing full reconnection due to unexpected error..."
)
await asyncio.sleep(1)
continue
# If we get here, all retries failed
logging.error(
f"All {max_retries} attempts failed. Last error: {type(last_exception).__name__}: {last_exception}"
)
raise last_exception
logging.info( logging.info(
"Successfully applied state to device '%s': %s", "Successfully applied state to device '%s': %s",

View File

@@ -1,4 +1,3 @@
import logging
import os import os
import urllib.parse import urllib.parse
import regex import regex
@@ -81,9 +80,9 @@ class LyricSearch(FastAPI):
) )
for endpoint, handler in self.endpoints.items(): for endpoint, handler in self.endpoints.items():
times: int = 20 times: int = 5
seconds: int = 2 seconds: int = 2
rate_limit: tuple[int, int] = (2, 3) # Default; (Times, Seconds) rate_limit: tuple[int, int] = (times, seconds) # Default; (Times, Seconds)
_schema_include = endpoint in ["lyric/search"] _schema_include = endpoint in ["lyric/search"]
if ( if (
@@ -187,7 +186,7 @@ class LyricSearch(FastAPI):
}, },
) )
excluded_sources: Optional[list] = data.excluded_sources excluded_sources: list = data.excluded_sources or []
aggregate_search = aggregate.Aggregate(exclude_methods=excluded_sources) aggregate_search = aggregate.Aggregate(exclude_methods=excluded_sources)
plain_lyrics: bool = not data.lrc plain_lyrics: bool = not data.lrc
result: Optional[Union[LyricsResult, dict]] = await aggregate_search.search( result: Optional[Union[LyricsResult, dict]] = await aggregate_search.search(
@@ -210,29 +209,93 @@ class LyricSearch(FastAPI):
if data.sub and not data.lrc: if data.sub and not data.lrc:
seeked_found_line: Optional[int] = None seeked_found_line: Optional[int] = None
lyric_lines: list[str] = result["lyrics"].strip().split(" / ") # Split lyrics into lines based on <br>, newline characters, or " / "
lyrics_text = result["lyrics"].strip()
# Determine the delimiter and split accordingly
if "<br>" in lyrics_text:
lyric_lines = lyrics_text.split("<br>")
separator = "<br>"
elif " / " in lyrics_text:
lyric_lines = lyrics_text.split(" / ")
separator = " / "
else:
lyric_lines = lyrics_text.split("\n")
separator = "\n"
search_term = data.sub.strip().lower()
# First try single-line matching (existing behavior)
for i, line in enumerate(lyric_lines): for i, line in enumerate(lyric_lines):
line = regex.sub(r"\u2064", "", line.strip()) # Remove any special characters and extra spaces
if data.sub.strip().lower() in line.strip().lower(): cleaned_line = regex.sub(r"\u2064", "", line.strip())
if search_term in cleaned_line.lower():
seeked_found_line = i seeked_found_line = i
logging.debug(
"Found %s at %s, match for %s!",
line,
seeked_found_line,
data.sub,
) # REMOVEME: DEBUG
break break
if not seeked_found_line: # If no single-line match found, try multi-line matching
if seeked_found_line is None:
# Try matching across consecutive lines (up to 5 lines for reasonable performance)
max_lines_to_check = min(5, len(lyric_lines))
for i in range(len(lyric_lines)):
for line_count in range(2, max_lines_to_check + 1):
if i + line_count <= len(lyric_lines):
# Combine consecutive lines with space separator
combined_lines = []
line_positions: list[
tuple[int, int]
] = [] # Track where each line starts in combined text
combined_text_parts: list[str] = []
for j in range(line_count):
if i + j < len(lyric_lines):
cleaned_line = regex.sub(
r"\u2064", "", lyric_lines[i + j].strip()
)
combined_lines.append(cleaned_line)
# Track position of this line in the combined text
line_start_pos = len(
" ".join(combined_text_parts).lower()
)
if line_start_pos > 0:
line_start_pos += (
1 # Account for space separator
)
line_positions.append((i + j, line_start_pos))
combined_text_parts.append(cleaned_line)
combined_text = " ".join(combined_lines).lower()
if search_term in combined_text:
# Find which specific line the match starts in
match_pos = combined_text.find(search_term)
# Find the line that contains the start of the match
actual_start_line = i # Default fallback
for line_idx, line_start_pos in line_positions:
if line_start_pos <= match_pos:
actual_start_line = line_idx
else:
break
seeked_found_line = actual_start_line
break
if seeked_found_line is not None:
break
if seeked_found_line is None:
return JSONResponse( return JSONResponse(
status_code=500,
content={ content={
"err": True, "err": True,
"errorText": "Seek (a.k.a. subsearch) failed.", "errorText": "Seek (a.k.a. subsearch) failed.",
"failed_seek": True, "failed_seek": True,
}, },
) )
result["lyrics"] = " / ".join(lyric_lines[seeked_found_line:]) # Only include lines strictly starting from the matched line
result["lyrics"] = separator.join(lyric_lines[seeked_found_line:])
result["confidence"] = int(result["confidence"]) result["confidence"] = int(result["confidence"])
result["time"] = f"{float(result['time']):.4f}" result["time"] = f"{float(result['time']):.4f}"

View File

@@ -307,7 +307,7 @@ class Radio(FastAPI):
} }
) )
async def album_art_handler( def album_art_handler(
self, request: Request, track_id: Optional[int] = None, self, request: Request, track_id: Optional[int] = None,
station: Station = "main" station: Station = "main"
) -> Response: ) -> Response:
@@ -371,6 +371,14 @@ class Radio(FastAPI):
ret_obj.pop("file_path") ret_obj.pop("file_path")
return JSONResponse(content=ret_obj) return JSONResponse(content=ret_obj)
def _bg_cache_art(self, track_id: int, file_path: str):
try:
album_art = self.radio_util.get_album_art(track_id=track_id)
if not album_art:
self.radio_util.cache_album_art(track_id, file_path)
except Exception as e:
logging.error(f"Background art cache error: {e}")
async def radio_get_next( async def radio_get_next(
self, self,
data: ValidRadioNextRequest, data: ValidRadioNextRequest,
@@ -448,13 +456,7 @@ class Radio(FastAPI):
logging.info("radio_get_next Exception: %s", str(e)) logging.info("radio_get_next Exception: %s", str(e))
traceback.print_exc() traceback.print_exc()
try: background_tasks.add_task(self._bg_cache_art, next["id"], next["file_path"])
album_art = self.radio_util.get_album_art(track_id=next["id"])
if not album_art:
self.radio_util.cache_album_art(next["id"], next["file_path"])
except Exception as e:
logging.info("radio_get_next Exception: %s", str(e))
traceback.print_exc()
return JSONResponse(content=next) return JSONResponse(content=next)
@@ -496,9 +498,13 @@ class Radio(FastAPI):
}, },
) )
search: bool = self.radio_util.search_db( loop = asyncio.get_running_loop()
search: bool = await loop.run_in_executor(
None,
lambda: self.radio_util.search_db(
artistsong=artistsong, artist=artist, song=song, station=data.station artistsong=artistsong, artist=artist, song=song, station=data.station
) )
)
if data.alsoSkip: if data.alsoSkip:
await self.radio_util._ls_skip(data.station) await self.radio_util._ls_skip(data.station)
return JSONResponse(content={"result": search}) return JSONResponse(content={"result": search})
@@ -764,9 +770,24 @@ class Radio(FastAPI):
logging.info(f"[LRC] Starting fetch for {station}: {artist} - {title}") logging.info(f"[LRC] Starting fetch for {station}: {artist} - {title}")
# Try SR first with timeout # Try LRCLib first with timeout
try: try:
async with asyncio.timeout(5.0): # 5 second timeout async with asyncio.timeout(10.0): # 10 second timeout
logging.info("[LRC] Trying LRCLib")
lrclib_result = await self.lrclib.search(artist, title, plain=False, raw=True)
if lrclib_result and lrclib_result.lyrics and isinstance(lrclib_result.lyrics, str):
logging.info("[LRC] Found from LRCLib")
return lrclib_result.lyrics, "LRCLib"
except asyncio.TimeoutError:
logging.warning("[LRC] LRCLib fetch timed out")
except Exception as e:
logging.error(f"[LRC] LRCLib fetch error: {e}")
logging.info("[LRC] LRCLib fetch completed without results")
# Try SR as fallback with timeout
try:
async with asyncio.timeout(10.0): # 10 second timeout
lrc = await self.sr_util.get_lrc_by_artist_song( lrc = await self.sr_util.get_lrc_by_artist_song(
artist, title, duration=duration artist, title, duration=duration
) )
@@ -778,21 +799,6 @@ class Radio(FastAPI):
except Exception as e: except Exception as e:
logging.error(f"[LRC] SR fetch error: {e}") logging.error(f"[LRC] SR fetch error: {e}")
logging.info("[LRC] SR fetch completed without results")
# Try LRCLib as fallback with timeout
try:
async with asyncio.timeout(5.0): # 5 second timeout
logging.info("[LRC] Trying LRCLib fallback")
lrclib_result = await self.lrclib.search(artist, title, plain=False)
if lrclib_result and lrclib_result.lyrics and isinstance(lrclib_result.lyrics, str):
logging.info("[LRC] Found from LRCLib")
return lrclib_result.lyrics, "LRCLib"
except asyncio.TimeoutError:
logging.warning("[LRC] LRCLib fetch timed out")
except Exception as e:
logging.error(f"[LRC] LRCLib fetch error: {e}")
logging.info("[LRC] No lyrics found from any source") logging.info("[LRC] No lyrics found from any source")
return None, "None" return None, "None"
except Exception as e: except Exception as e:
@@ -804,11 +810,21 @@ class Radio(FastAPI):
try: try:
async with self.lrc_cache_locks[station]: async with self.lrc_cache_locks[station]:
self.lrc_cache.pop(station, None) self.lrc_cache.pop(station, None)
lrc, source = await self._fetch_and_cache_lrc(station, track_json) lrc, source = await self._fetch_and_cache_lrc(station, track_json)
async with self.lrc_cache_locks[station]:
# Verify we are still on the same song
current_track = self.radio_util.now_playing.get(station)
if current_track and current_track.get("uuid") == track_json.get("uuid"):
if lrc: if lrc:
self.lrc_cache[station] = lrc self.lrc_cache[station] = lrc
else: else:
self.lrc_cache[station] = None self.lrc_cache[station] = None
else:
logging.info(f"[LRC] Discarding fetch result for {station} as track changed.")
return
if lrc: if lrc:
await self.broadcast_lrc(station, lrc, source) await self.broadcast_lrc(station, lrc, source)
except Exception as e: except Exception as e:

View File

@@ -20,6 +20,9 @@ from lyric_search.sources import private
from typing import Literal from typing import Literal
from pydantic import BaseModel from pydantic import BaseModel
logger = logging.getLogger()
logger.setLevel(logging.DEBUG)
class ValidBulkFetchRequest(BaseModel): class ValidBulkFetchRequest(BaseModel):
track_ids: list[int] track_ids: list[int]

View File

@@ -1,171 +0,0 @@
import os
import aiosqlite as sqlite3
from fastapi import FastAPI, Depends, Response
from fastapi_throttle import RateLimiter
from fastapi.responses import JSONResponse
from typing import Optional, LiteralString, Union, Iterable, cast
from .constructors import ValidShowEpisodeLineRequest, ValidShowEpisodeListRequest
class Transcriptions(FastAPI):
"""
Transcription Endpoints
"""
def __init__(self, app: FastAPI, util, constants) -> None:
"""Initialize Transcriptions endpoints."""
self.app: FastAPI = app
self.util = util
self.constants = constants
self.endpoints: dict = {
"transcriptions/get_episodes": self.get_episodes_handler,
"transcriptions/get_episode_lines": self.get_episode_lines_handler,
# tbd
}
for endpoint, handler in self.endpoints.items():
app.add_api_route(
f"/{endpoint}",
handler,
methods=["POST"],
include_in_schema=True,
dependencies=[Depends(RateLimiter(times=2, seconds=2))],
)
async def get_episodes_handler(
self, data: ValidShowEpisodeListRequest
) -> JSONResponse:
"""
Get list of episodes by show ID.
Parameters:
- **data** (ValidShowEpisodeListRequest): Request containing show ID.
Returns:
- **JSONResponse**: Contains a list of episodes.
"""
show_id: int = data.s
db_path: Optional[Union[str, LiteralString]] = None
db_query: Optional[str] = None
show_title: Optional[str] = None
if not isinstance(show_id, int):
return JSONResponse(
status_code=500,
content={
"err": True,
"errorText": "Invalid request",
},
)
show_id = int(show_id)
if not (str(show_id).isnumeric()) or show_id not in [0, 1, 2]:
return JSONResponse(
status_code=500,
content={
"err": True,
"errorText": "Show not found.",
},
)
match show_id:
case 0:
db_path = os.path.join("/usr/local/share", "sqlite_dbs", "sp.db")
db_query = """SELECT DISTINCT(("S" || Season || "E" || Episode || " " || Title)), ID FROM SP_DAT ORDER BY Season, Episode"""
show_title = "South Park"
case 1:
db_path = os.path.join("/usr/local/share", "sqlite_dbs", "futur.db")
db_query = """SELECT DISTINCT(("S" || EP_S || "E" || EP_EP || " " || EP_TITLE)), EP_ID FROM clean_dialog ORDER BY EP_S, EP_EP"""
show_title = "Futurama"
case 2:
db_path = os.path.join("/usr/local/share", "sqlite_dbs", "parks.db")
db_query = """SELECT DISTINCT(("S" || EP_S || "E" || EP_EP || " " || EP_TITLE)), EP_ID FROM clean_dialog ORDER BY EP_S, EP_EP"""
show_title = "Parks And Rec"
case _:
return JSONResponse(
status_code=500,
content={
"err": True,
"errorText": "Unknown error.",
},
)
async with sqlite3.connect(database=db_path, timeout=1) as _db:
async with await _db.execute(db_query) as _cursor:
result: Iterable[sqlite3.Row] = await _cursor.fetchall()
return JSONResponse(
content={
"show_title": show_title,
"episodes": [
{
"id": item[1],
"ep_friendly": item[0],
}
for item in result
],
}
)
async def get_episode_lines_handler(
self, data: ValidShowEpisodeLineRequest
) -> Response:
"""
Get lines for a particular episode.
Parameters:
- **data** (ValidShowEpisodeLineRequest): Request containing show and episode ID.
Returns:
- **Response**: Episode lines.
"""
show_id: int = int(data.s)
episode_id: int = int(data.e)
match show_id:
case 0:
db_path: Union[str, LiteralString] = os.path.join(
"/usr/local/share", "sqlite_dbs", "sp.db"
)
db_query: str = """SELECT ("S" || Season || "E" || Episode || " " || Title), Character, Line FROM SP_DAT WHERE ID = ?"""
case 1:
db_path = os.path.join("/usr/local/share", "sqlite_dbs", "futur.db")
db_query = """SELECT ("S" || EP_S || "E" || EP_EP || " " || EP_TITLE || "<br><em>Opener: " || EP_OPENER || "</em>"), EP_LINE_SPEAKER, EP_LINE FROM clean_dialog WHERE EP_ID = ? ORDER BY LINE_ID ASC"""
case 2:
db_path = os.path.join("/usr/local/share", "sqlite_dbs", "parks.db")
db_query = """SELECT ("S" || EP_S || "E" || EP_EP || " " || EP_TITLE), EP_LINE_SPEAKER, EP_LINE FROM clean_dialog WHERE EP_ID = ? ORDER BY id ASC"""
case _:
return JSONResponse(
status_code=500,
content={
"err": True,
"errorText": "Unknown error",
},
)
async with sqlite3.connect(database=db_path, timeout=1) as _db:
params: tuple = (episode_id,)
async with await _db.execute(db_query, params) as _cursor:
result: Iterable[sqlite3.Row] = await _cursor.fetchall()
result_list = cast(list[sqlite3.Row], result)
if not result_list:
return Response(
status_code=404,
content="Not found",
)
first_result: sqlite3.Row = result_list[0]
return JSONResponse(
content={
"episode_id": episode_id,
"ep_friendly": first_result[0].strip(),
"lines": [
{
"speaker": item[1].strip(),
"line": item[2].strip(),
}
for item in result
],
}
)

112
lyric_search/models.py Normal file
View File

@@ -0,0 +1,112 @@
"""
Database models for LRCLib lyrics cache.
"""
import os
import urllib.parse
from typing import Type, AsyncGenerator
from sqlalchemy import (
Column,
Integer,
String,
Float,
Boolean,
DateTime,
ForeignKey,
UniqueConstraint,
)
from sqlalchemy.orm import relationship, foreign
from sqlalchemy.ext.declarative import declarative_base, DeclarativeMeta
from sqlalchemy.ext.asyncio import AsyncEngine, create_async_engine, AsyncSession
from sqlalchemy.ext.asyncio import async_sessionmaker
from dotenv import load_dotenv
load_dotenv()
Base: Type[DeclarativeMeta] = declarative_base()
class Tracks(Base): # type: ignore
"""Tracks table - stores track metadata."""
__tablename__ = "tracks"
id = Column(Integer, primary_key=True, autoincrement=True)
name = Column(String, index=True)
name_lower = Column(String, index=True)
artist_name = Column(String, index=True)
artist_name_lower = Column(String, index=True)
album_name = Column(String)
album_name_lower = Column(String, index=True)
duration = Column(Float, index=True)
last_lyrics_id = Column(Integer, ForeignKey("lyrics.id"), index=True)
created_at = Column(DateTime)
updated_at = Column(DateTime)
# Relationships
lyrics = relationship(
"Lyrics",
back_populates="track",
foreign_keys=[last_lyrics_id],
primaryjoin="Tracks.id == foreign(Lyrics.track_id)",
)
# Constraints
__table_args__ = (
UniqueConstraint(
"name_lower",
"artist_name_lower",
"album_name_lower",
"duration",
name="uq_tracks",
),
)
class Lyrics(Base): # type: ignore
"""Lyrics table - stores lyrics content."""
__tablename__ = "lyrics"
id = Column(Integer, primary_key=True, autoincrement=True)
plain_lyrics = Column(String)
synced_lyrics = Column(String)
track_id = Column(Integer, ForeignKey("tracks.id"), index=True)
has_plain_lyrics = Column(Boolean, index=True)
has_synced_lyrics = Column(Boolean, index=True)
instrumental = Column(Boolean)
source = Column(String, index=True)
created_at = Column(DateTime, index=True)
updated_at = Column(DateTime)
# Relationships
track = relationship(
"Tracks",
back_populates="lyrics",
foreign_keys=[track_id],
primaryjoin=(Tracks.id == foreign(track_id)),
remote_side=Tracks.id,
)
# PostgreSQL connection - using environment variables
POSTGRES_HOST = os.getenv("POSTGRES_HOST", "localhost")
POSTGRES_PORT = os.getenv("POSTGRES_PORT", "5432")
POSTGRES_DB = os.getenv("POSTGRES_DB", "lrclib")
POSTGRES_USER = os.getenv("POSTGRES_USER", "api")
POSTGRES_PASSWORD = os.getenv("POSTGRES_PASSWORD", "")
# URL-encode the password to handle special characters
encoded_password = urllib.parse.quote_plus(POSTGRES_PASSWORD)
DATABASE_URL: str = f"postgresql+asyncpg://{POSTGRES_USER}:{encoded_password}@{POSTGRES_HOST}:{POSTGRES_PORT}/{POSTGRES_DB}"
async_engine: AsyncEngine = create_async_engine(
DATABASE_URL, pool_size=20, max_overflow=10, pool_pre_ping=True, echo=False
)
AsyncSessionLocal = async_sessionmaker(bind=async_engine, expire_on_commit=False)
async def get_async_db():
"""Get async database session."""
async with AsyncSessionLocal() as session:
yield session

View File

@@ -14,9 +14,7 @@ class Aggregate:
Aggregate all source methods Aggregate all source methods
""" """
def __init__(self, exclude_methods=None) -> None: def __init__(self, exclude_methods: list = []) -> None:
if not exclude_methods:
exclude_methods: list = []
self.exclude_methods = exclude_methods self.exclude_methods = exclude_methods
self.redis_cache = redis_cache.RedisCache() self.redis_cache = redis_cache.RedisCache()
self.notifier = notifier.DiscordNotifier() self.notifier = notifier.DiscordNotifier()
@@ -70,14 +68,14 @@ class Aggregate:
if plain: # do not record LRC fails if plain: # do not record LRC fails
try: try:
await self.redis_cache.increment_found_count("failed") await self.redis_cache.increment_found_count("failed")
self.notifier.send( # await self.notifier.send(
"WARNING", # "WARNING",
f"Could not find {artist} - {song} via queried sources.", # f"Could not find {artist} - {song} via queried sources.",
) # )
except Exception as e: except Exception as e:
traceback.print_exc() traceback.print_exc()
logging.info("Could not increment redis failed counter: %s", str(e)) logging.info("Could not increment redis failed counter: %s", str(e))
self.notifier.send( await self.notifier.send(
f"ERROR @ {__file__.rsplit('/', maxsplit=1)[-1]}", f"ERROR @ {__file__.rsplit('/', maxsplit=1)[-1]}",
f"Could not increment redis failed counter: {str(e)}", f"Could not increment redis failed counter: {str(e)}",
) )

View File

@@ -343,15 +343,19 @@ class Cache:
) )
else: else:
best_match = (result_tracks[0], 100) best_match = (result_tracks[0], 100)
if not best_match or confidence < 90: if not best_match:
return None return None
(candidate, confidence) = best_match (candidate, confidence) = best_match
if confidence < 90:
return None
logging.info("Result found on %s", self.label) logging.info("Result found on %s", self.label)
matched = self.get_matched( matched = self.get_matched(
sqlite_rows=results, sqlite_rows=results,
matched_candidate=candidate, matched_candidate=candidate,
confidence=confidence, confidence=confidence,
) )
if matched is None:
return None
time_end: float = time.time() time_end: float = time.time()
time_diff: float = time_end - time_start time_diff: float = time_end - time_start
matched.time = time_diff matched.time = time_diff

View File

@@ -45,11 +45,11 @@ class Genius:
Optional[LyricsResult]: The result, if found - None otherwise. Optional[LyricsResult]: The result, if found - None otherwise.
""" """
try: try:
artist = artist.strip().lower() artist_name = artist.strip().lower()
song = song.strip().lower() song_name = song.strip().lower()
time_start: float = time.time() time_start: float = time.time()
logging.info("Searching %s - %s on %s", artist, song, self.label) logging.info("Searching %s - %s on %s", artist_name, song_name, self.label)
search_term: str = f"{artist}%20{song}" search_term: str = f"{artist_name}%20{song_name}"
returned_lyrics: str = "" returned_lyrics: str = ""
async with ClientSession() as client: async with ClientSession() as client:
async with client.get( async with client.get(
@@ -100,10 +100,13 @@ class Genius:
) )
for returned in possible_matches for returned in possible_matches
] ]
searched: str = f"{artist} - {song}" searched: str = f"{artist_name} - {song_name}"
best_match: tuple = self.matcher.find_best_match( best_match: Optional[tuple] = self.matcher.find_best_match(
input_track=searched, candidate_tracks=to_scrape input_track=searched, candidate_tracks=to_scrape
) )
if not best_match:
raise InvalidGeniusResponseException("No matching result")
logging.info("To scrape: %s", to_scrape) logging.info("To scrape: %s", to_scrape)
((scrape_stub, track), confidence) = best_match ((scrape_stub, track), confidence) = best_match
scrape_url: str = f"{self.genius_url}{scrape_stub[1:]}" scrape_url: str = f"{self.genius_url}{scrape_stub[1:]}"
@@ -157,8 +160,8 @@ class Genius:
returned_lyrics: str = self.datautils.scrub_lyrics( returned_lyrics: str = self.datautils.scrub_lyrics(
returned_lyrics returned_lyrics
) )
artist: str = track.split(" - ", maxsplit=1)[0] artist = track.split(" - ", maxsplit=1)[0]
song: str = track.split(" - ", maxsplit=1)[1] song = track.split(" - ", maxsplit=1)[1]
logging.info("Result found on %s", self.label) logging.info("Result found on %s", self.label)
time_end: float = time.time() time_end: float = time.time()
time_diff: float = time_end - time_start time_diff: float = time_end - time_start

View File

@@ -1,45 +1,41 @@
import time import time
import traceback
import logging import logging
from typing import Optional, Union from typing import Optional
from aiohttp import ClientTimeout, ClientSession from sqlalchemy.future import select
from tenacity import retry, stop_after_attempt, wait_fixed
from lyric_search import utils from lyric_search import utils
from lyric_search.constructors import LyricsResult from lyric_search.constructors import LyricsResult
from . import common, cache, redis_cache from lyric_search.models import Tracks, Lyrics, AsyncSessionLocal
from lyric_search.constructors import InvalidLRCLibResponseException from . import redis_cache
logger = logging.getLogger() logger = logging.getLogger()
log_level = logging.getLevelName(logger.level) log_level = logging.getLevelName(logger.level)
class LRCLib: class LRCLib:
"""LRCLib Search Module""" """LRCLib Search Module - Local PostgreSQL Database"""
def __init__(self) -> None: def __init__(self) -> None:
self.label: str = "LRCLib" self.label: str = "LRCLib-Cache"
self.lrclib_url: str = "https://lrclib.net/api/search"
self.headers: dict = common.SCRAPE_HEADERS
self.timeout = ClientTimeout(connect=3, sock_read=8)
self.datautils = utils.DataUtils() self.datautils = utils.DataUtils()
self.matcher = utils.TrackMatcher() self.matcher = utils.TrackMatcher()
self.cache = cache.Cache()
self.redis_cache = redis_cache.RedisCache() self.redis_cache = redis_cache.RedisCache()
@retry(stop=stop_after_attempt(2), wait=wait_fixed(0.5))
async def search( async def search(
self, self,
artist: str, artist: str,
song: str, song: str,
plain: Optional[bool] = True, plain: Optional[bool] = True,
duration: Optional[int] = None, duration: Optional[int] = None,
raw: bool = False,
) -> Optional[LyricsResult]: ) -> Optional[LyricsResult]:
""" """
LRCLib Search LRCLib Local Database Search
Args: Args:
artist (str): the artist to search artist (str): the artist to search
song (str): the song to search song (str): the song to search
plain (bool): return plain lyrics (True) or synced lyrics (False)
duration (int): optional track duration for better matching
raw (bool): return raw LRC string instead of parsed object (only for synced)
Returns: Returns:
Optional[LyricsResult]: The result, if found - None otherwise. Optional[LyricsResult]: The result, if found - None otherwise.
""" """
@@ -47,128 +43,105 @@ class LRCLib:
artist = artist.strip().lower() artist = artist.strip().lower()
song = song.strip().lower() song = song.strip().lower()
time_start: float = time.time() time_start: float = time.time()
lrc_obj: Optional[list[dict]] = None
logging.info("Searching %s - %s on %s", artist, song, self.label) logging.info("Searching %s - %s on %s", artist, song, self.label)
input_track: str = f"{artist} - {song}" async with AsyncSessionLocal() as db:
returned_lyrics: str = ""
async with ClientSession() as client:
async with await client.get(
self.lrclib_url,
params={
"artist_name": artist,
"track_name": song,
**({"duration": duration} if duration else {}),
},
timeout=self.timeout,
headers=self.headers,
) as request:
request.raise_for_status()
text: Optional[str] = await request.text()
if not text:
raise InvalidLRCLibResponseException("No search response.")
if len(text) < 100:
raise InvalidLRCLibResponseException(
"Search response text was invalid (len < 100 chars.)"
)
search_data: Optional[Union[list, dict]] = await request.json()
if not isinstance(search_data, list | dict):
raise InvalidLRCLibResponseException("No JSON search data.")
# logging.info("Search Data:\n%s", search_data)
if not isinstance(search_data, list):
raise InvalidLRCLibResponseException("Invalid JSON.")
# Filter by duration if provided
if duration:
search_data = [
r
for r in search_data
if abs(r.get("duration", 0) - duration) <= 10
]
if plain:
possible_matches = [
(
x,
f"{result.get('artistName')} - {result.get('trackName')}",
)
for x, result in enumerate(search_data)
]
else:
logging.info(
"Limiting possible matches to only those with non-null syncedLyrics"
)
possible_matches = [
(
x,
f"{result.get('artistName')} - {result.get('trackName')}",
)
for x, result in enumerate(search_data)
if isinstance(result["syncedLyrics"], str)
]
best_match = None best_match = None
try:
match_result = self.matcher.find_best_match( # Try exact match first (fastest)
input_track, result = await db.execute(
possible_matches, # type: ignore select(
Tracks.artist_name,
Tracks.name,
Lyrics.plain_lyrics,
Lyrics.synced_lyrics,
) )
if match_result: .join(Lyrics, Tracks.id == Lyrics.track_id)
best_match = match_result[0] .filter(
except: # noqa Tracks.artist_name_lower == artist,
pass Tracks.name_lower == song,
)
.limit(1)
)
best_match = result.first()
# If no exact match, try prefix match (faster than full ILIKE)
if not best_match:
result = await db.execute(
select(
Tracks.artist_name,
Tracks.name,
Lyrics.plain_lyrics,
Lyrics.synced_lyrics,
)
.join(Lyrics, Tracks.id == Lyrics.track_id)
.filter(
Tracks.artist_name_lower.like(f"{artist}%"),
Tracks.name_lower.like(f"{song}%"),
)
.limit(1)
)
best_match = result.first()
# If still no match, try full ILIKE (slowest)
if not best_match:
result = await db.execute(
select(
Tracks.artist_name,
Tracks.name,
Lyrics.plain_lyrics,
Lyrics.synced_lyrics,
)
.join(Lyrics, Tracks.id == Lyrics.track_id)
.filter(
Tracks.artist_name_lower.ilike(f"%{artist}%"),
Tracks.name_lower.ilike(f"%{song}%"),
)
.limit(1)
)
best_match = result.first()
if not best_match: if not best_match:
return logging.info("No result found on %s", self.label)
best_match_id = best_match[0] return None
if not isinstance(search_data[best_match_id]["artistName"], str): returned_artist = best_match.artist_name
raise InvalidLRCLibResponseException( returned_song = best_match.name
f"Invalid JSON: Cannot find artistName key.\n{search_data}"
)
if not isinstance(search_data[best_match_id]["trackName"], str):
raise InvalidLRCLibResponseException(
f"Invalid JSON: Cannot find trackName key.\n{search_data}"
)
returned_artist: str = search_data[best_match_id]["artistName"]
returned_song: str = search_data[best_match_id]["trackName"]
if plain: if plain:
if not isinstance( if not best_match.plain_lyrics:
search_data[best_match_id]["plainLyrics"], str logging.info("No plain lyrics available on %s", self.label)
): return None
raise InvalidLRCLibResponseException( returned_lyrics = best_match.plain_lyrics
f"Invalid JSON: Cannot find plainLyrics key.\n{search_data}"
)
returned_lyrics: str = search_data[best_match_id]["plainLyrics"]
returned_lyrics = self.datautils.scrub_lyrics(returned_lyrics) returned_lyrics = self.datautils.scrub_lyrics(returned_lyrics)
lrc_obj = None
else:
if not best_match.synced_lyrics:
logging.info("No synced lyrics available on %s", self.label)
return None
returned_lyrics = best_match.synced_lyrics
if raw:
lrc_obj = returned_lyrics
else: else:
if not isinstance(
search_data[best_match_id]["syncedLyrics"], str
):
raise InvalidLRCLibResponseException(
f"Invalid JSON: Cannot find syncedLyrics key.\n{search_data}"
)
returned_lyrics: str = search_data[best_match_id][
"syncedLyrics"
]
lrc_obj = self.datautils.create_lrc_object(returned_lyrics) lrc_obj = self.datautils.create_lrc_object(returned_lyrics)
returned_track: str = f"{returned_artist} - {returned_song}"
# Calculate match confidence
input_track = f"{artist} - {song}"
returned_track = f"{returned_artist} - {returned_song}"
match_result = self.matcher.find_best_match( match_result = self.matcher.find_best_match(
input_track=input_track, candidate_tracks=[(0, returned_track)] input_track=input_track, candidate_tracks=[(0, returned_track)]
) )
if not match_result: if not match_result:
return # No suitable match found return None
_matched, confidence = match_result _matched, confidence = match_result
logging.info("Result found on %s", self.label) logging.info("Result found on %s", self.label)
time_end: float = time.time() time_end = time.time()
time_diff: float = time_end - time_start time_diff = time_end - time_start
matched = LyricsResult( matched = LyricsResult(
artist=returned_artist, artist=returned_artist,
song=returned_song, song=returned_song,
@@ -177,10 +150,10 @@ class LRCLib:
confidence=confidence, confidence=confidence,
time=time_diff, time=time_diff,
) )
await self.redis_cache.increment_found_count(self.label) await self.redis_cache.increment_found_count(self.label)
if plain:
await self.cache.store(matched)
return matched return matched
except Exception as e: except Exception as e:
logging.debug("Exception: %s", str(e)) logging.error("Exception in %s: %s", self.label, str(e))
traceback.print_exc() return None

View File

@@ -13,7 +13,7 @@ from lyric_search import notifier
from lyric_search.constructors import LyricsResult from lyric_search.constructors import LyricsResult
import redis.asyncio as redis import redis.asyncio as redis
from redis.commands.search.query import Query # type: ignore from redis.commands.search.query import Query # type: ignore
from redis.commands.search.indexDefinition import IndexDefinition, IndexType # type: ignore from redis.commands.search.index_definition import IndexDefinition, IndexType # type: ignore
from redis.commands.search.field import TextField, Field # type: ignore from redis.commands.search.field import TextField, Field # type: ignore
from redis.commands.json.path import Path # type: ignore from redis.commands.json.path import Path # type: ignore
from . import private from . import private

View File

@@ -17,7 +17,7 @@ from rapidfuzz import fuzz
from endpoints.constructors import RadioException from endpoints.constructors import RadioException
import redis.asyncio as redis import redis.asyncio as redis
from redis.commands.search.query import Query # noqa from redis.commands.search.query import Query # noqa
from redis.commands.search.indexDefinition import IndexDefinition, IndexType # noqa from redis.commands.search.index_definition import IndexDefinition, IndexType # noqa
from redis.commands.search.field import TextField # noqa from redis.commands.search.field import TextField # noqa
from redis.commands.json.path import Path # noqa from redis.commands.json.path import Path # noqa
from lyric_search.sources import private from lyric_search.sources import private
@@ -486,11 +486,11 @@ class RadioUtil:
) )
"""Loading Complete""" """Loading Complete"""
self.playlists_loaded = True
# Request skip from LS to bring streams current # Request skip from LS to bring streams current
for playlist in self.playlists: for playlist in self.playlists:
logging.info("Skipping: %s", playlist) logging.info("Skipping: %s", playlist)
await self._ls_skip(playlist) await self._ls_skip(playlist)
self.playlists_loaded = True
except Exception as e: except Exception as e:
logging.info("Playlist load failed: %s", str(e)) logging.info("Playlist load failed: %s", str(e))
traceback.print_exc() traceback.print_exc()

View File

@@ -45,6 +45,29 @@ load_dotenv()
sr = SRUtil() sr = SRUtil()
logger = logging.getLogger(__name__)
async def check_flac_stream(file_path):
"""Check if the given file contains a FLAC stream using ffprobe."""
cmd = [
"ffprobe",
"-v",
"error",
"-select_streams",
"a:0",
"-show_entries",
"stream=codec_name",
"-of",
"default=noprint_wrappers=1:nokey=1",
file_path,
]
process = await asyncio.create_subprocess_exec(
*cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE
)
stdout, _ = await process.communicate()
return b"flac" in stdout
# ---------- Discord helper ---------- # ---------- Discord helper ----------
async def discord_notify( async def discord_notify(
@@ -259,13 +282,16 @@ def bulk_download(track_list: list, quality: str = "FLAC"):
) )
) )
async def process_tracks(): async def process_tracks(track_list):
per_track_meta = [] per_track_meta = []
all_final_files = [] all_final_files = []
all_artists = set() all_artists = set()
(ROOT_DIR / "completed").mkdir(parents=True, exist_ok=True) (ROOT_DIR / "completed").mkdir(parents=True, exist_ok=True)
# Ensure aiohttp session is properly closed
async with aiohttp.ClientSession(headers=HEADERS) as session: async with aiohttp.ClientSession(headers=HEADERS) as session:
print(f"DEBUG: Starting process_tracks with {len(track_list)} tracks")
# Set up a one-time rate-limit callback to notify on the first 429 seen by SRUtil # Set up a one-time rate-limit callback to notify on the first 429 seen by SRUtil
async def _rate_limit_notify(exc: Exception): async def _rate_limit_notify(exc: Exception):
try: try:
@@ -285,6 +311,7 @@ def bulk_download(track_list: list, quality: str = "FLAC"):
pass pass
total = len(track_list or []) total = len(track_list or [])
for i, track_id in enumerate(track_list or []): for i, track_id in enumerate(track_list or []):
print(f"DEBUG: Processing track {i + 1}/{total}: {track_id}")
track_info = { track_info = {
"track_id": str(track_id), "track_id": str(track_id),
"status": "Pending", "status": "Pending",
@@ -300,31 +327,53 @@ def bulk_download(track_list: list, quality: str = "FLAC"):
track_info["attempts"] = attempt track_info["attempts"] = attempt
try: try:
sr.get_cover_by_album_id print(f"DEBUG: Getting downloadable for track {track_id}")
url = await sr.get_stream_url_by_track_id(track_id, quality) # Fetch downloadable (handles DASH and others)
if not url: downloadable = await sr._safe_api_call(
raise RuntimeError("No stream URL") sr.streamrip_client.get_downloadable,
str(track_id),
2 if quality == "FLAC" else 1,
retries=3,
)
parsed = urlparse(url) print(f"DEBUG: Got downloadable: {type(downloadable)}")
clean_path = unquote(parsed.path) if not downloadable:
ext = Path(clean_path).suffix or ".mp3" raise RuntimeError("No downloadable created")
ext = f".{downloadable.extension}"
tmp_file = Path(f"/tmp/{uuid.uuid4().hex}{ext}") tmp_file = Path(f"/tmp/{uuid.uuid4().hex}{ext}")
async with session.get(url) as resp: print(f"DEBUG: Starting download to {tmp_file}")
resp.raise_for_status() # Download
with open(tmp_file, "wb") as f: print(f"TRACK {track_id}: Starting download")
async for chunk in resp.content.iter_chunked(64 * 1024): try:
f.write(chunk) await downloadable._download(
str(tmp_file), callback=lambda x=None: None
)
print(
f"TRACK {track_id}: Download method completed normally"
)
except Exception as download_e:
print(
f"TRACK {track_id}: Download threw exception: {download_e}"
)
raise
print(
f"DEBUG: Download completed, file exists: {tmp_file.exists()}"
)
if not tmp_file.exists():
raise RuntimeError(
f"Download completed but no file created: {tmp_file}"
)
print(f"DEBUG: Fetching metadata for track {track_id}")
# Metadata fetch
try: try:
md = await sr.get_metadata_by_track_id(track_id) or {} md = await sr.get_metadata_by_track_id(track_id) or {}
print(f"DEBUG: Metadata fetched: {bool(md)}")
except MetadataFetchError as me: except MetadataFetchError as me:
# Permanent metadata failure — notify and continue (mark track failed) # Permanent metadata failure — mark failed and break
msg = f"Metadata permanently failed for track {track_id}: {me}"
try:
send_log_to_discord(msg, "ERROR", target)
except Exception:
pass
track_info["status"] = "Failed" track_info["status"] = "Failed"
track_info["error"] = str(me) track_info["error"] = str(me)
per_track_meta.append(track_info) per_track_meta.append(track_info)
@@ -333,6 +382,7 @@ def bulk_download(track_list: list, quality: str = "FLAC"):
job.meta["progress"] = int(((i + 1) / total) * 100) job.meta["progress"] = int(((i + 1) / total) * 100)
job.save_meta() job.save_meta()
break break
artist_raw = md.get("artist") or "Unknown Artist" artist_raw = md.get("artist") or "Unknown Artist"
album_raw = md.get("album") or "Unknown Album" album_raw = md.get("album") or "Unknown Album"
title_raw = md.get("title") or f"Track {track_id}" title_raw = md.get("title") or f"Track {track_id}"
@@ -341,15 +391,19 @@ def bulk_download(track_list: list, quality: str = "FLAC"):
album = sanitize_filename(album_raw) album = sanitize_filename(album_raw)
title = sanitize_filename(title_raw) title = sanitize_filename(title_raw)
print(f"TRACK {track_id}: Processing '{title}' by {artist}")
all_artists.add(artist) all_artists.add(artist)
album_dir = staging_root / artist / album album_dir = staging_root / artist / album
album_dir.mkdir(parents=True, exist_ok=True) album_dir.mkdir(parents=True, exist_ok=True)
final_file = ensure_unique_path(album_dir / f"{title}{ext}") final_file = ensure_unique_path(album_dir / f"{title}{ext}")
# Move file into final location first (tags will be updated on moved file) # Move to final location
print(f"TRACK {track_id}: Moving to final location...")
tmp_file.rename(final_file) tmp_file.rename(final_file)
print(f"TRACK {track_id}: File moved successfully")
# Try to fetch cover art via SRUtil (use album_id from metadata) # Fetch cover art
try: try:
album_field = md.get("album") album_field = md.get("album")
album_id = md.get("album_id") or ( album_id = md.get("album_id") or (
@@ -370,9 +424,9 @@ def bulk_download(track_list: list, quality: str = "FLAC"):
else: else:
cover_url = md.get("cover_url") cover_url = md.get("cover_url")
# Embed tags + artwork using music_tag if available, falling back to mediafile tagging # Embed tags
embedded = False embedded = False
try: img_bytes = None
if cover_url: if cover_url:
try: try:
timeout = aiohttp.ClientTimeout(total=15) timeout = aiohttp.ClientTimeout(total=15)
@@ -383,7 +437,6 @@ def bulk_download(track_list: list, quality: str = "FLAC"):
img_bytes = await img_resp.read() img_bytes = await img_resp.read()
else: else:
img_bytes = None img_bytes = None
# Notify Discord about failed cover download (HTTP error)
try: try:
send_log_to_discord( send_log_to_discord(
f"Cover download HTTP `{img_resp.status}` for track `{track_id} album_id={album_id} url={cover_url} artist={artist} album={album}`", f"Cover download HTTP `{img_resp.status}` for track `{track_id} album_id={album_id} url={cover_url} artist={artist} album={album}`",
@@ -394,7 +447,6 @@ def bulk_download(track_list: list, quality: str = "FLAC"):
pass pass
except Exception as e: except Exception as e:
img_bytes = None img_bytes = None
# Notify Discord about exception during cover download
try: try:
send_log_to_discord( send_log_to_discord(
f"Cover download exception for track `{track_id} album_id={album_id} url={cover_url} artist={artist} album={album}`: `{e}`", f"Cover download exception for track `{track_id} album_id={album_id} url={cover_url} artist={artist} album={album}`: `{e}`",
@@ -403,16 +455,15 @@ def bulk_download(track_list: list, quality: str = "FLAC"):
) )
except Exception: except Exception:
pass pass
else:
img_bytes = None
# Prefer music_tag if available (keeps compatibility with add_cover_art.py) # Try music_tag first
try: try:
from music_tag import load_file as mt_load_file # type: ignore from music_tag import load_file as mt_load_file # type: ignore
# Add validation for `mf` object
try: try:
mf = mt_load_file(str(final_file)) mf = mt_load_file(str(final_file))
# set basic tags if mf is not None:
if md.get("title"): if md.get("title"):
mf["title"] = md.get("title") mf["title"] = md.get("title")
if md.get("artist"): if md.get("artist"):
@@ -429,14 +480,15 @@ def bulk_download(track_list: list, quality: str = "FLAC"):
mf["artwork"] = img_bytes mf["artwork"] = img_bytes
mf.save() mf.save()
embedded = True embedded = True
else:
logger.error("Failed to load file with music_tag.")
embedded = False
except Exception: except Exception:
embedded = False embedded = False
except Exception: except Exception:
embedded = False embedded = False
# If music_tag not available or failed, fallback to mediafile tagging
if not embedded: if not embedded:
# If we had a cover_url but no bytes, log a warning to Discord
try: try:
if cover_url and not img_bytes: if cover_url and not img_bytes:
send_log_to_discord( send_log_to_discord(
@@ -446,20 +498,22 @@ def bulk_download(track_list: list, quality: str = "FLAC"):
) )
except Exception: except Exception:
pass pass
tag_with_mediafile(str(final_file), md)
except Exception:
# Ensure at least the basic tags are written
try: try:
tag_with_mediafile(str(final_file), md) tag_with_mediafile(str(final_file), md)
except Exception: except Exception:
pass pass
tmp_file = None
# Success
tmp_file = None
track_info["status"] = "Success" track_info["status"] = "Success"
track_info["file_path"] = str(final_file) track_info["file_path"] = str(final_file)
track_info["error"] = None track_info["error"] = None
all_final_files.append(final_file) all_final_files.append(final_file)
print(
f"TRACK {track_id}: SUCCESS! Progress: {((i + 1) / total) * 100:.0f}%"
)
if job: if job:
job.meta["progress"] = int(((i + 1) / total) * 100) job.meta["progress"] = int(((i + 1) / total) * 100)
job.meta["tracks"] = per_track_meta + [track_info] job.meta["tracks"] = per_track_meta + [track_info]
@@ -469,7 +523,7 @@ def bulk_download(track_list: list, quality: str = "FLAC"):
except aiohttp.ClientResponseError as e: except aiohttp.ClientResponseError as e:
msg = f"Track {track_id} attempt {attempt} ClientResponseError: {e}" msg = f"Track {track_id} attempt {attempt} ClientResponseError: {e}"
send_log_to_discord(msg, "WARNING", target) send_log_to_discord(msg, "WARNING", target)
if e.status == 429: if getattr(e, "status", None) == 429:
wait_time = min(60, 2**attempt) wait_time = min(60, 2**attempt)
await asyncio.sleep(wait_time) await asyncio.sleep(wait_time)
else: else:
@@ -662,7 +716,7 @@ def bulk_download(track_list: list, quality: str = "FLAC"):
loop = asyncio.new_event_loop() loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop) asyncio.set_event_loop(loop)
try: try:
return loop.run_until_complete(process_tracks()) return loop.run_until_complete(process_tracks(track_list))
except Exception as e: except Exception as e:
send_log_to_discord( send_log_to_discord(
f"bulk_download failed: {e}\n{traceback.format_exc()}", "CRITICAL", target f"bulk_download failed: {e}\n{traceback.format_exc()}", "CRITICAL", target
@@ -672,3 +726,13 @@ def bulk_download(track_list: list, quality: str = "FLAC"):
job.save_meta() job.save_meta()
finally: finally:
loop.close() loop.close()
# Correct integration of FLAC stream check
async def process_tracks(track_list):
for i, track_id in enumerate(track_list or []):
combined_path = f"/tmp/{uuid.uuid4().hex}_combined.m4s" # Example path
if not await check_flac_stream(combined_path):
logger.error(f"No FLAC stream found in {combined_path}. Skipping file.")
continue
# Proceed with decoding pipeline

View File

@@ -19,15 +19,21 @@ class MetadataFetchError(Exception):
"""Raised when metadata fetch permanently fails after retries.""" """Raised when metadata fetch permanently fails after retries."""
# Suppress all logging output from this module and its children # Suppress noisy logging from this module and from the `streamrip` library
for name in [__name__, "utils.sr_wrapper"]: # We set propagate=False so messages don't bubble up to the root logger and
# attach a NullHandler where appropriate to avoid "No handler found" warnings.
for name in [__name__, "utils.sr_wrapper", "streamrip", "streamrip.client"]:
logger = logging.getLogger(name) logger = logging.getLogger(name)
logger.setLevel(logging.INFO) # Temporarily set to INFO for debugging LRC # Keep default level (or raise to WARNING) so non-important logs are dropped
try:
logger.setLevel(logging.WARNING)
except Exception:
pass
logger.propagate = False logger.propagate = False
for handler in logger.handlers: # Ensure a NullHandler is present so logs don't propagate and no missing-handler
handler.setLevel(logging.INFO) # warnings are printed when the package emits records.
# Also set the root logger to CRITICAL as a last resort (may affect global logging) if not any(isinstance(h, logging.NullHandler) for h in logger.handlers):
# logging.getLogger().setLevel(logging.CRITICAL) logger.addHandler(logging.NullHandler())
load_dotenv() load_dotenv()
@@ -190,12 +196,23 @@ class SRUtil:
title_match = self.is_fuzzy_match(expected_title, found_title, threshold) title_match = self.is_fuzzy_match(expected_title, found_title, threshold)
return artist_match and album_match and title_match return artist_match and album_match and title_match
def dedupe_by_key(self, key: str, entries: list[dict]) -> list[dict]: def dedupe_by_key(
deduped = {} self, key: str | list[str], entries: list[dict]
) -> list[dict]:
"""Return entries de-duplicated by one or more keys."""
keys = [key] if isinstance(key, str) else list(key)
if not keys:
return entries
def normalize(value: Any) -> str:
return str(value or "").strip().lower()
deduped: dict[tuple[str, ...], dict] = {}
for entry in entries: for entry in entries:
norm = entry[key].strip().lower() composite_key = tuple(normalize(entry.get(k)) for k in keys)
if norm not in deduped: if composite_key not in deduped:
deduped[norm] = entry deduped[composite_key] = entry
return list(deduped.values()) return list(deduped.values())
def group_artists_by_name( def group_artists_by_name(
@@ -444,7 +461,7 @@ class SRUtil:
return None return None
if not metadata: if not metadata:
return None return None
albums = self.dedupe_by_key("title", metadata.get("albums", [])) albums = self.dedupe_by_key(["title", "releaseDate"], metadata.get("albums", []))
albums_out = [ albums_out = [
{ {
"artist": ", ".join(artist["name"] for artist in album["artists"]), "artist": ", ".join(artist["name"] for artist in album["artists"]),
@@ -684,21 +701,22 @@ class SRUtil:
except Exception as e: except Exception as e:
# Exponential backoff with jitter for 429 or other errors # Exponential backoff with jitter for 429 or other errors
delay = self.RETRY_DELAY * (2 ** (attempt - 1)) + random.uniform(0, 0.5) delay = self.RETRY_DELAY * (2 ** (attempt - 1)) + random.uniform(0, 0.5)
if attempt < self.MAX_METADATA_RETRIES:
logging.warning( logging.warning(
"Metadata fetch failed for track %s (attempt %d/%d): %s. Retrying in %.2fs", "Retrying metadata fetch for track %s (attempt %d/%d): %s. Next retry in %.2fs",
track_id, track_id,
attempt, attempt,
self.MAX_METADATA_RETRIES, self.MAX_METADATA_RETRIES,
str(e), str(e),
delay, delay,
) )
if attempt < self.MAX_METADATA_RETRIES:
await asyncio.sleep(delay) await asyncio.sleep(delay)
else: else:
logging.error( logging.error(
"Metadata fetch failed permanently for track %s after %d attempts", "Metadata fetch failed permanently for track %s after %d attempts: %s",
track_id, track_id,
self.MAX_METADATA_RETRIES, self.MAX_METADATA_RETRIES,
str(e),
) )
# Raise a specific exception so callers can react (e.g. notify) # Raise a specific exception so callers can react (e.g. notify)
raise MetadataFetchError( raise MetadataFetchError(
@@ -788,7 +806,7 @@ class SRUtil:
tracks_with_diff.sort(key=lambda x: x[1]) tracks_with_diff.sort(key=lambda x: x[1])
best_track, min_diff = tracks_with_diff[0] best_track, min_diff = tracks_with_diff[0]
logging.info(f"SR: Best match duration diff: {min_diff}s") logging.info(f"SR: Best match duration diff: {min_diff}s")
# If the closest match is more than 5 seconds off, consider no match # If the closest match is more than 10 seconds off, consider no match
if min_diff > 10: if min_diff > 10:
logging.info("SR: Duration diff too large, no match") logging.info("SR: Duration diff too large, no match")
return None return None