Compare commits

..

16 Commits

Author SHA1 Message Date
6240888ac5 cull transcriptions endpoints from README.md 2025-12-03 13:56:28 -05:00
be9ed777a5 retire transcriptions endpoints 2025-12-03 13:55:52 -05:00
7779049c93 misc / sr_wrapper-- only consider an album returned from tidal to be a duplicate if both the name AND release date match. 2025-11-26 15:09:43 -05:00
41d9065c79 rm 2025-11-25 13:08:20 -05:00
85298b861d minor 2025-11-25 13:06:07 -05:00
476c4e6e51 bugfix: LRCLIb's models.py needs load_dotenv 2025-11-23 07:12:38 -05:00
353f14c899 formatting / minor 2025-11-22 21:43:48 -05:00
3d0b867427 Misc 2025-11-22 21:41:12 -05:00
dcc6c7b24e More progress re: #34
- Change of direction, LRCLib searches from /lyric/search now use internal cache - which is a PGSQL import of the LRCLib SQLite database.  Change to PGSQL was made for performance.
2025-11-22 13:13:03 -05:00
c302b256d3 Begin #34 2025-11-21 12:29:12 -05:00
c6d2bad79d Enhance lyric search functionality by improving line splitting logic and adding multi-line matching for subsearch. Update cache handling to ensure confidence threshold is respected before returning results. 2025-10-24 13:40:55 -04:00
25d1ab226e Enhance Cync authentication flow with improved token management and 2FA handling. Add periodic token validation and logging for better debugging. Introduce FLAC stream check in bulk download process. 2025-10-22 06:55:37 -04:00
3f66223328 Fix import statement for index_definition in redis_cache.py and radio_util.py (dependency upgrade related-- redis module) 2025-10-15 12:13:03 -04:00
c493f2aabf Increase rate limit for lighting state requests and enhance error handling for Cync device operations. Improve lyric search processing by splitting lyrics based on line breaks and cleaning special characters (bugfix for subsearch/seek). 2025-10-15 10:10:56 -04:00
0029a9ec19 . 2025-10-07 12:07:45 -04:00
90c3efbb8b Remove .gitignore file, radio database restructuring 2025-10-07 12:07:13 -04:00
18 changed files with 1172 additions and 672 deletions

0
.github/copilot-instructions.md vendored Normal file
View File

37
.gitignore vendored
View File

@@ -1,37 +0,0 @@
**/__pycache__/*
**/.vscode/*
**/private.py
**/solibs/*
**/*.json
**/mgr/*
constants.py
tests.py
db_migrate.py
notifier.py
test_hifi.py
youtube*
playlist_creator.py
artist_genre_tag.py
pg_migrate_lyrics.py
uv.lock
pyproject.toml
mypy.ini
.python-version
get_next_track.py
endpoints/radio.py
utils/radio_util.py
redis_playlist.py
endpoints/auth.py
endpoints/radio2
endpoints/radio2/**
hash_password.py
up.py
job_review.py
check_missing.py
**/auth/*
**/radio_api/*
**/test/*
test/db_stats.py
test/report/*
.gitignore
.env

View File

@@ -51,10 +51,6 @@ This server is built with [FastAPI](https://fastapi.tiangolo.com/) and provides
**YouTube (`/yt/`)** 🎥 **YouTube (`/yt/`)** 🎥
- `POST /yt/search` - Search for YouTube videos by title - `POST /yt/search` - Search for YouTube videos by title
**Transcriptions (`/transcriptions/`)** 📝
- `POST /transcriptions/get_episodes` - Get episode list by show ID
- `POST /transcriptions/get_episode_lines` - Get transcript lines for specific episodes
**Lyric Search (`/lyric/`)** 🎵 **Lyric Search (`/lyric/`)** 🎵
- `POST /lyric/search` - Search for song lyrics across multiple sources - `POST /lyric/search` - Search for song lyrics across multiple sources
- Supports artist/song search, text search within lyrics - Supports artist/song search, text search within lyrics

19
base.py
View File

@@ -22,8 +22,8 @@ app = FastAPI(
contact={"name": "codey"}, contact={"name": "codey"},
redirect_slashes=False, redirect_slashes=False,
loop=loop, loop=loop,
docs_url="/docs", # Swagger UI (default) docs_url="/docs", # Swagger UI (default)
redoc_url="/redoc", # ReDoc UI (default, but explicitly set) redoc_url="/redoc", # ReDoc UI (default, but explicitly set)
) )
constants = importlib.import_module("constants").Constants() constants = importlib.import_module("constants").Constants()
@@ -36,6 +36,7 @@ origins = [
"https://status.boatson.boats", "https://status.boatson.boats",
"https://_new.codey.lol", "https://_new.codey.lol",
"http://localhost:4321", "http://localhost:4321",
"https://local.codey.lol:4321",
] ]
app.add_middleware( app.add_middleware(
@@ -46,13 +47,12 @@ app.add_middleware(
allow_headers=["*"], allow_headers=["*"],
) # type: ignore ) # type: ignore
# Add Scalar API documentation endpoint (before blacklist routes) # Add Scalar API documentation endpoint (before blacklist routes)
@app.get("/scalar", include_in_schema=False) @app.get("/scalar", include_in_schema=False)
def scalar_docs(): def scalar_docs():
return get_scalar_api_reference( return get_scalar_api_reference(openapi_url="/openapi.json", title="codey.lol API")
openapi_url="/openapi.json",
title="codey.lol API"
)
""" """
Blacklisted routes Blacklisted routes
@@ -73,7 +73,9 @@ def base_head():
def disallow_get_any(request: Request, var: Any = None): def disallow_get_any(request: Request, var: Any = None):
path = request.path_params["path"] path = request.path_params["path"]
allowed_paths = ["widget", "misc/no", "docs", "redoc", "scalar", "openapi.json"] allowed_paths = ["widget", "misc/no", "docs", "redoc", "scalar", "openapi.json"]
logging.info(f"Checking path: {path}, allowed: {path in allowed_paths or path.split('/', maxsplit=1)[0] in allowed_paths}") logging.info(
f"Checking path: {path}, allowed: {path in allowed_paths or path.split('/', maxsplit=1)[0] in allowed_paths}"
)
if not ( if not (
isinstance(path, str) isinstance(path, str)
and (path.split("/", maxsplit=1)[0] in allowed_paths or path in allowed_paths) and (path.split("/", maxsplit=1)[0] in allowed_paths or path in allowed_paths)
@@ -101,9 +103,6 @@ routes: dict = {
"randmsg": importlib.import_module("endpoints.rand_msg").RandMsg( "randmsg": importlib.import_module("endpoints.rand_msg").RandMsg(
app, util, constants app, util, constants
), ),
"transcriptions": importlib.import_module(
"endpoints.transcriptions"
).Transcriptions(app, util, constants),
"lyrics": importlib.import_module("endpoints.lyric_search").LyricSearch( "lyrics": importlib.import_module("endpoints.lyric_search").LyricSearch(
app, util, constants app, util, constants
), ),

View File

@@ -3,6 +3,9 @@ import json
import os import os
import time import time
import aiohttp import aiohttp
import asyncio
import traceback
from datetime import datetime
from fastapi import FastAPI, Depends, HTTPException, Request from fastapi import FastAPI, Depends, HTTPException, Request
from fastapi_throttle import RateLimiter from fastapi_throttle import RateLimiter
from fastapi.responses import JSONResponse from fastapi.responses import JSONResponse
@@ -14,10 +17,75 @@ from pycync.user import User # type: ignore
from pycync.cync import Cync as Cync # type: ignore from pycync.cync import Cync as Cync # type: ignore
from pycync import Auth # type: ignore from pycync import Auth # type: ignore
from pycync.exceptions import TwoFactorRequiredError, AuthFailedError # type: ignore from pycync.exceptions import TwoFactorRequiredError, AuthFailedError # type: ignore
import inspect
import getpass
from typing import Optional
# Configure logging to write to a file for specific events
logging.basicConfig(
filename="cync_auth_events.log",
level=logging.INFO,
format="%(asctime)s - %(levelname)s - %(message)s",
)
def _mask_token(token: Optional[str]) -> str:
"""Mask sensitive token data for logging, showing only first/last 4 chars."""
if not token or len(token) < 8:
return "<invalid_token>"
return f"{token[:4]}...{token[-4:]}"
def _log_token_state(user, context: str):
"""Log masked token state for debugging."""
if not user:
logging.info(f"{context} - No user object available")
return
try:
logging.info(
f"{context} - Token state: access=%s refresh=%s expires_at=%s",
_mask_token(getattr(user, "access_token", None)),
_mask_token(getattr(user, "refresh_token", None)),
getattr(user, "expires_at", None),
)
except Exception as e:
logging.error(f"Error logging token state: {e}")
class Lighting(FastAPI): class Lighting(FastAPI):
async def ensure_cync_connection(self): async def _close_session_safely(self):
"""Safely close the current session if it exists."""
if self.session and not getattr(self.session, "closed", True):
try:
await self.session.close()
# Wait a bit for the underlying connections to close
await asyncio.sleep(0.1)
except Exception as e:
logging.warning(f"Error closing session: {e}")
self.session = None
self.auth = None
self.cync_api = None
async def _test_connection_health(self) -> bool:
"""Test if the current connection is healthy by making a simple API call."""
if (
not self.cync_api
or not self.session
or getattr(self.session, "closed", True)
):
return False
try:
# Make a simple API call to test connectivity
devices = self.cync_api.get_devices()
# Just check if we get a response without errors
return devices is not None
except Exception as e:
logging.warning(f"Connection health check failed: {e}")
return False
async def ensure_cync_connection(self, force_reconnect: bool = False):
"""Ensure aiohttp session and Cync API are alive, re-create if needed.""" """Ensure aiohttp session and Cync API are alive, re-create if needed."""
# Check required environment variables # Check required environment variables
missing_vars = [] missing_vars = []
@@ -31,58 +99,168 @@ class Lighting(FastAPI):
raise Exception( raise Exception(
f"Missing required environment variables: {', '.join(missing_vars)}" f"Missing required environment variables: {', '.join(missing_vars)}"
) )
# Cast to str after check to silence linter # Cast to str after check to silence linter
cync_email: str = self.cync_email # type: ignore cync_email: str = self.cync_email # type: ignore
cync_password: str = self.cync_password # type: ignore cync_password: str = self.cync_password # type: ignore
# Check if session is closed or missing # If force_reconnect is True or connection is unhealthy, rebuild everything
if not self.session or getattr(self.session, "closed", False): if force_reconnect or not await self._test_connection_health():
self.session = aiohttp.ClientSession() logging.info(
"Connection unhealthy or force reconnect requested. Rebuilding connection..."
)
# Clean up existing connection
await self._close_session_safely()
# Create new session with timeout configuration
timeout = aiohttp.ClientTimeout(total=30, connect=10)
connector = aiohttp.TCPConnector(
limit=100,
limit_per_host=30,
ttl_dns_cache=300,
use_dns_cache=True,
keepalive_timeout=60,
enable_cleanup_closed=True,
)
self.session = aiohttp.ClientSession(timeout=timeout, connector=connector)
# Load cached token and check validity
self.cync_user = None
cached_user = self._load_cached_user() cached_user = self._load_cached_user()
if cached_user: token_status = None
if cached_user and hasattr(cached_user, "expires_at"):
# Add buffer time - consider token expired if less than 5 minutes remaining
buffer_time = 300 # 5 minutes
if cached_user.expires_at > (time.time() + buffer_time):
token_status = "valid"
else:
token_status = "expired"
else:
token_status = "no cached user or missing expires_at"
logging.info(f"Cync token status: {token_status}")
if token_status == "valid" and cached_user is not None:
# Use cached token
self.auth = Auth( self.auth = Auth(
session=self.session, session=self.session,
user=cached_user, user=cached_user,
username=cync_email, username=cync_email,
password=cync_password, password=cync_password,
) )
self.cync_user = cached_user
logging.info("Reusing valid cached token, no 2FA required.")
else: else:
# Need fresh login - clear any cached user that's expired
if token_status == "expired":
try:
os.remove(self.token_cache_path)
logging.info("Removed expired token cache")
except (OSError, FileNotFoundError):
pass
logging.info("Initializing new Auth instance...")
self.auth = Auth( self.auth = Auth(
session=self.session, username=cync_email, password=cync_password session=self.session,
username=cync_email,
password=cync_password,
) )
# Try to refresh token
self.cync_user = None
if (
self.auth.user
and hasattr(self.auth.user, "expires_at")
and self.auth.user.expires_at > time.time()
):
try:
await self.auth.async_refresh_user_token()
self.cync_user = self.auth.user
self._save_cached_user(self.cync_user)
except AuthFailedError:
pass
# If no valid token, login
if not self.cync_user:
try: try:
logging.info("Attempting fresh login...")
self.cync_user = await self.auth.login() self.cync_user = await self.auth.login()
_log_token_state(self.cync_user, "After fresh login")
self._save_cached_user(self.cync_user) self._save_cached_user(self.cync_user)
logging.info("Fresh login successful")
except TwoFactorRequiredError: except TwoFactorRequiredError:
logging.error( twofa_code = os.getenv("CYNC_2FA_CODE")
"Cync 2FA required. Set CYNC_2FA_CODE in env if needed." if not twofa_code:
) print("Cync 2FA required. Please enter your code:")
raise Exception("Cync 2FA required.") twofa_code = getpass.getpass("2FA Code: ")
if twofa_code:
logging.info("Retrying Cync login with 2FA code.")
try:
self.cync_user = await self.auth.login(
two_factor_code=twofa_code
)
self._save_cached_user(self.cync_user)
logging.info("Logged in with 2FA successfully.")
except Exception as e:
logging.error("Cync 2FA login failed: %s", e)
logging.info(
"2FA failure details: Code=%s, User=%s",
twofa_code,
self.cync_user,
)
raise Exception("Cync 2FA code invalid or not accepted.")
else:
logging.error("Cync 2FA required but no code provided.")
raise Exception("Cync 2FA required.")
except AuthFailedError as e: except AuthFailedError as e:
logging.error("Failed to authenticate with Cync API: %s", e) logging.error("Failed to authenticate with Cync API: %s", e)
raise Exception("Cync authentication failed.") raise Exception("Cync authentication failed.")
self.cync_api = await Cync.create(self.auth)
# Also check if cync_api is None (shouldn't happen, but just in case) # Create new Cync API instance
if not self.cync_api: try:
if not self.auth: logging.info("Creating Cync API instance...")
logging.critical("self.auth: %s", self.auth) _log_token_state(self.auth.user, "Before Cync.create")
return self.cync_api = await Cync.create(self.auth)
self.cync_api = await Cync.create(self.auth) logging.info("Cync API connection established successfully")
except Exception as e:
logging.error("Failed to create Cync API instance")
logging.error("Exception details: %s", str(e))
logging.error("Traceback:\n%s", traceback.format_exc())
# Save diagnostic info
diagnostic_data = {
"timestamp": datetime.now().isoformat(),
"error_type": type(e).__name__,
"error_message": str(e),
"auth_state": {
"has_auth": bool(self.auth),
"has_user": bool(getattr(self.auth, "user", None)),
"user_state": {
"access_token": _mask_token(
getattr(self.auth.user, "access_token", None)
)
if self.auth and self.auth.user
else None,
"refresh_token": _mask_token(
getattr(self.auth.user, "refresh_token", None)
)
if self.auth and self.auth.user
else None,
"expires_at": getattr(self.auth.user, "expires_at", None)
if self.auth and self.auth.user
else None,
}
if self.auth and self.auth.user
else None,
},
}
diagnostic_file = f"cync_api_failure-{int(time.time())}.json"
try:
with open(diagnostic_file, "w") as f:
json.dump(diagnostic_data, f, indent=2)
logging.info(
f"Saved API creation diagnostic data to {diagnostic_file}"
)
except Exception as save_error:
logging.error(f"Failed to save diagnostic data: {save_error}")
raise
# Final validation
if (
not self.cync_api
or not self.session
or getattr(self.session, "closed", True)
):
logging.error("Connection validation failed after setup")
_log_token_state(
getattr(self.auth, "user", None), "Failed connection validation"
)
raise Exception("Failed to establish proper Cync connection")
""" """
Lighting Endpoints Lighting Endpoints
@@ -108,6 +286,7 @@ class Lighting(FastAPI):
self.auth = None self.auth = None
self.cync_user = None self.cync_user = None
self.cync_api = None self.cync_api = None
self.health_check_task: Optional[asyncio.Task] = None
# Set up Cync connection at startup using FastAPI event # Set up Cync connection at startup using FastAPI event
@app.on_event("startup") @app.on_event("startup")
@@ -125,76 +304,193 @@ class Lighting(FastAPI):
f"Missing required environment variables: {', '.join(missing_vars)}" f"Missing required environment variables: {', '.join(missing_vars)}"
) )
self.session = aiohttp.ClientSession() # Use ensure_cync_connection which has proper token caching
cached_user = self._load_cached_user() try:
if cached_user: await self.ensure_cync_connection()
self.auth = Auth( logging.info("Cync lighting system initialized successfully")
session=self.session, except Exception as e:
user=cached_user, logging.error(f"Failed to initialize Cync connection at startup: {e}")
username=self.cync_email or "", # Don't raise - allow server to start, connection will be retried on first request
password=self.cync_password or "",
)
else:
self.auth = Auth(
session=self.session,
username=self.cync_email or "",
password=self.cync_password or "",
)
# Try to refresh token
if (
self.auth.user
and hasattr(self.auth.user, "expires_at")
and self.auth.user.expires_at > time.time()
):
try:
await self.auth.async_refresh_user_token()
self.cync_user = self.auth.user
self._save_cached_user(self.cync_user)
except AuthFailedError:
pass
# If no valid token, login
if not self.cync_user:
try:
self.cync_user = await self.auth.login()
self._save_cached_user(self.cync_user)
except TwoFactorRequiredError:
logging.error(
"Cync 2FA required. Set CYNC_2FA_CODE in env if needed."
)
raise Exception("Cync 2FA required.")
except AuthFailedError as e:
logging.error("Failed to authenticate with Cync API: %s", e)
raise Exception("Cync authentication failed.")
# Create persistent Cync API object
self.cync_api = await Cync.create(self.auth)
# Schedule periodic token validation and connection health checks
self.health_check_task = asyncio.create_task(self._schedule_health_checks())
@app.on_event("shutdown")
async def shutdown_event():
# Cancel health check task
if self.health_check_task and not self.health_check_task.done():
self.health_check_task.cancel()
try:
await self.health_check_task
except asyncio.CancelledError:
logging.info("Health check task cancelled successfully")
pass
# Clean up connections
await self._close_session_safely()
logging.info("Cync lighting system shut down cleanly")
# Register endpoints
self.endpoints: dict = { self.endpoints: dict = {
"lighting/state": self.get_lighting_state, "lighting/state": self.get_lighting_state,
} }
for endpoint, handler in self.endpoints.items(): for endpoint, handler in self.endpoints.items():
app.add_api_route( self.app.add_api_route(
f"/{endpoint}", f"/{endpoint}",
handler, handler,
methods=["GET"], methods=["GET"],
include_in_schema=True, include_in_schema=True,
dependencies=[ dependencies=[
Depends(RateLimiter(times=10, seconds=2)), Depends(RateLimiter(times=25, seconds=2)),
Depends(get_current_user), Depends(get_current_user),
], ],
) )
app.add_api_route( self.app.add_api_route(
"/lighting/state", "/lighting/state",
self.set_lighting_state, self.set_lighting_state,
methods=["POST"], methods=["POST"],
include_in_schema=True, include_in_schema=True,
dependencies=[ dependencies=[
Depends(RateLimiter(times=10, seconds=2)), Depends(RateLimiter(times=25, seconds=2)),
Depends(get_current_user), Depends(get_current_user),
], ],
) )
async def _refresh_or_login(self):
if not self.auth:
logging.error("Auth object is not initialized.")
raise Exception("Cync authentication not initialized.")
try:
user = getattr(self.auth, "user", None)
_log_token_state(user, "Before refresh attempt")
if user and hasattr(user, "expires_at") and user.expires_at > time.time():
refresh = getattr(self.auth, "async_refresh_user_token", None)
if callable(refresh):
try:
logging.info("Attempting token refresh...")
result = refresh()
if inspect.isawaitable(result):
await result
logging.info(
"Token refresh completed successfully (awaited)"
)
else:
logging.info("Token refresh completed (non-awaitable)")
except AuthFailedError as e:
logging.error("Token refresh failed with AuthFailedError")
logging.error("Exception details: %s", str(e))
logging.error("Traceback:\n%s", traceback.format_exc())
# Save diagnostic info to file
diagnostic_data = {
"timestamp": datetime.now().isoformat(),
"error_type": "AuthFailedError",
"error_message": str(e),
"user_state": {
"access_token": _mask_token(
getattr(user, "access_token", None)
),
"refresh_token": _mask_token(
getattr(user, "refresh_token", None)
),
"expires_at": getattr(user, "expires_at", None),
},
}
try:
diagnostic_file = (
f"cync_auth_failure-{int(time.time())}.json"
)
with open(diagnostic_file, "w") as f:
json.dump(diagnostic_data, f, indent=2)
logging.info(f"Saved diagnostic data to {diagnostic_file}")
except Exception as save_error:
logging.error(
f"Failed to save diagnostic data: {save_error}"
)
raise
login = getattr(self.auth, "login", None)
if callable(login):
try:
result = login()
if inspect.isawaitable(result):
self.cync_user = await result
else:
self.cync_user = result
self._save_cached_user(self.cync_user)
logging.info("Logged in successfully.")
except TwoFactorRequiredError:
twofa_code = os.getenv("CYNC_2FA_CODE")
if not twofa_code:
# Prompt interactively if not set
print("Cync 2FA required. Please enter your code:")
twofa_code = getpass.getpass("2FA Code: ")
if twofa_code:
logging.info("Retrying Cync login with 2FA code.")
try:
result = login(two_factor_code=twofa_code)
if inspect.isawaitable(result):
self.cync_user = await result
else:
self.cync_user = result
self._save_cached_user(self.cync_user)
logging.info("Logged in with 2FA successfully.")
except Exception as e:
logging.error("Cync 2FA login failed: %s", e)
logging.info(
"2FA failure details: Code=%s, User=%s",
twofa_code,
self.cync_user,
)
raise Exception("Cync 2FA code invalid or not accepted.")
else:
logging.error("Cync 2FA required but no code provided.")
raise Exception("Cync 2FA required.")
else:
raise Exception("Auth object missing login method.")
except AuthFailedError as e:
logging.error("Failed to authenticate with Cync API: %s", e)
raise Exception("Cync authentication failed.")
except Exception as e:
logging.error("Unexpected error during authentication: %s", e)
raise
async def _schedule_health_checks(self):
"""Periodic health checks and token validation."""
while True:
try:
await asyncio.sleep(300) # Check every 5 minutes
# Check token expiration (refresh if less than 10 minutes left)
if self.cync_user and hasattr(self.cync_user, "expires_at"):
expires_at = getattr(self.cync_user, "expires_at", 0)
time_until_expiry = expires_at - time.time()
if time_until_expiry < 600: # Less than 10 minutes
logging.info(
f"Token expires in {int(time_until_expiry / 60)} minutes. Refreshing..."
)
try:
await self._refresh_or_login()
except Exception as e:
logging.error(
f"Token refresh failed during health check: {e}"
)
# Test connection health
if not await self._test_connection_health():
logging.warning(
"Connection health check failed. Will reconnect on next API call."
)
except asyncio.CancelledError:
logging.info("Health check task cancelled")
break
except Exception as e:
logging.error(f"Error during periodic health check: {e}")
# Continue the loop even on errors
def _load_cached_user(self): def _load_cached_user(self):
try: try:
if os.path.exists(self.token_cache_path): if os.path.exists(self.token_cache_path):
@@ -253,8 +549,10 @@ class Lighting(FastAPI):
""" """
Set the lighting state and apply it to the Cync device. Set the lighting state and apply it to the Cync device.
""" """
logging.info("=== LIGHTING STATE REQUEST RECEIVED ===")
try: try:
state = await request.json() state = await request.json()
logging.info(f"Requested state: {state}")
# Validate state (basic validation) # Validate state (basic validation)
if not isinstance(state, dict): if not isinstance(state, dict):
raise HTTPException( raise HTTPException(
@@ -266,7 +564,7 @@ class Lighting(FastAPI):
await self.ensure_cync_connection() await self.ensure_cync_connection()
# Apply to Cync device # Validate and extract state values
power = state.get("power", "off") power = state.get("power", "off")
if power not in ["on", "off"]: if power not in ["on", "off"]:
raise HTTPException( raise HTTPException(
@@ -296,41 +594,134 @@ class Lighting(FastAPI):
else: else:
rgb = None rgb = None
# Use persistent Cync API object # Apply to Cync device with robust retry and error handling
if not self.cync_api: max_retries = 3
raise HTTPException(status_code=500, detail="Cync API not initialized.") last_exception: Exception = Exception("No attempts made")
devices = self.cync_api.get_devices()
if not devices or not isinstance(devices, (list, tuple)): for attempt in range(max_retries):
raise HTTPException( try:
status_code=500, detail="No devices returned from Cync API." # Ensure connection before each attempt
) force_reconnect = attempt > 0 # Force reconnect on retries
light = next( await self.ensure_cync_connection(force_reconnect=force_reconnect)
(
d if not self.cync_api:
for d in devices raise Exception("Cync API not available after connection setup")
if hasattr(d, "name") and d.name == self.cync_device_name
), logging.info(
None, f"Attempt {attempt + 1}/{max_retries}: Getting devices from Cync API..."
)
devices = self.cync_api.get_devices()
if not devices:
raise Exception("No devices returned from Cync API")
logging.info(
f"Devices returned: {[getattr(d, 'name', 'unnamed') for d in devices]}"
)
light = next(
(
d
for d in devices
if hasattr(d, "name") and d.name == self.cync_device_name
),
None,
)
if not light:
available_devices = [
getattr(d, "name", "unnamed") for d in devices
]
raise Exception(
f"Device '{self.cync_device_name}' not found. Available devices: {available_devices}"
)
logging.info(
f"Selected device: {getattr(light, 'name', 'unnamed')}"
)
# Execute device operations
operations_completed = []
# Set power
if power == "on":
result = await light.turn_on()
operations_completed.append(f"turn_on: {result}")
else:
result = await light.turn_off()
operations_completed.append(f"turn_off: {result}")
# Set brightness
if "brightness" in state:
result = await light.set_brightness(brightness)
operations_completed.append(
f"set_brightness({brightness}): {result}"
)
# Set color
if rgb:
result = await light.set_rgb(rgb)
operations_completed.append(f"set_rgb({rgb}): {result}")
logging.info(
f"All operations completed successfully: {operations_completed}"
)
break # Success, exit retry loop
except (
aiohttp.ClientConnectionError,
aiohttp.ClientOSError,
aiohttp.ServerDisconnectedError,
aiohttp.ClientConnectorError,
ConnectionResetError,
ConnectionError,
OSError,
asyncio.TimeoutError,
) as e:
last_exception = e
logging.warning(
f"Connection/network error (attempt {attempt + 1}/{max_retries}): {type(e).__name__}: {e}"
)
if attempt < max_retries - 1:
# Wait a bit before retry to allow network/server recovery
await asyncio.sleep(
2**attempt
) # Exponential backoff: 1s, 2s, 4s
continue
except (AuthFailedError, TwoFactorRequiredError) as e:
last_exception = e
logging.error(
f"Authentication error (attempt {attempt + 1}/{max_retries}): {e}"
)
if attempt < max_retries - 1:
# Clear cached tokens on auth errors
try:
os.remove(self.token_cache_path)
logging.info("Cleared token cache due to auth error")
except (OSError, FileNotFoundError):
pass
await asyncio.sleep(1)
continue
except Exception as e:
last_exception = e
error_msg = f"Unexpected error (attempt {attempt + 1}/{max_retries}): {type(e).__name__}: {e}"
logging.error(error_msg)
# On unexpected errors, try reconnecting for next attempt
if attempt < max_retries - 1:
logging.warning(
"Forcing full reconnection due to unexpected error..."
)
await asyncio.sleep(1)
continue
# If we get here, all retries failed
logging.error(
f"All {max_retries} attempts failed. Last error: {type(last_exception).__name__}: {last_exception}"
) )
if not light: raise last_exception
raise HTTPException(
status_code=404,
detail=f"Device '{self.cync_device_name}' not found",
)
# Set power
if power == "on":
await light.turn_on()
else:
await light.turn_off()
# Set brightness
if "brightness" in state:
await light.set_brightness(brightness)
# Set color
if rgb:
await light.set_rgb(rgb)
logging.info( logging.info(
"Successfully applied state to device '%s': %s", "Successfully applied state to device '%s': %s",

View File

@@ -1,4 +1,3 @@
import logging
import os import os
import urllib.parse import urllib.parse
import regex import regex
@@ -81,9 +80,9 @@ class LyricSearch(FastAPI):
) )
for endpoint, handler in self.endpoints.items(): for endpoint, handler in self.endpoints.items():
times: int = 20 times: int = 5
seconds: int = 2 seconds: int = 2
rate_limit: tuple[int, int] = (2, 3) # Default; (Times, Seconds) rate_limit: tuple[int, int] = (times, seconds) # Default; (Times, Seconds)
_schema_include = endpoint in ["lyric/search"] _schema_include = endpoint in ["lyric/search"]
if ( if (
@@ -187,7 +186,7 @@ class LyricSearch(FastAPI):
}, },
) )
excluded_sources: Optional[list] = data.excluded_sources excluded_sources: list = data.excluded_sources or []
aggregate_search = aggregate.Aggregate(exclude_methods=excluded_sources) aggregate_search = aggregate.Aggregate(exclude_methods=excluded_sources)
plain_lyrics: bool = not data.lrc plain_lyrics: bool = not data.lrc
result: Optional[Union[LyricsResult, dict]] = await aggregate_search.search( result: Optional[Union[LyricsResult, dict]] = await aggregate_search.search(
@@ -210,29 +209,93 @@ class LyricSearch(FastAPI):
if data.sub and not data.lrc: if data.sub and not data.lrc:
seeked_found_line: Optional[int] = None seeked_found_line: Optional[int] = None
lyric_lines: list[str] = result["lyrics"].strip().split(" / ") # Split lyrics into lines based on <br>, newline characters, or " / "
lyrics_text = result["lyrics"].strip()
# Determine the delimiter and split accordingly
if "<br>" in lyrics_text:
lyric_lines = lyrics_text.split("<br>")
separator = "<br>"
elif " / " in lyrics_text:
lyric_lines = lyrics_text.split(" / ")
separator = " / "
else:
lyric_lines = lyrics_text.split("\n")
separator = "\n"
search_term = data.sub.strip().lower()
# First try single-line matching (existing behavior)
for i, line in enumerate(lyric_lines): for i, line in enumerate(lyric_lines):
line = regex.sub(r"\u2064", "", line.strip()) # Remove any special characters and extra spaces
if data.sub.strip().lower() in line.strip().lower(): cleaned_line = regex.sub(r"\u2064", "", line.strip())
if search_term in cleaned_line.lower():
seeked_found_line = i seeked_found_line = i
logging.debug(
"Found %s at %s, match for %s!",
line,
seeked_found_line,
data.sub,
) # REMOVEME: DEBUG
break break
if not seeked_found_line: # If no single-line match found, try multi-line matching
if seeked_found_line is None:
# Try matching across consecutive lines (up to 5 lines for reasonable performance)
max_lines_to_check = min(5, len(lyric_lines))
for i in range(len(lyric_lines)):
for line_count in range(2, max_lines_to_check + 1):
if i + line_count <= len(lyric_lines):
# Combine consecutive lines with space separator
combined_lines = []
line_positions: list[
tuple[int, int]
] = [] # Track where each line starts in combined text
combined_text_parts: list[str] = []
for j in range(line_count):
if i + j < len(lyric_lines):
cleaned_line = regex.sub(
r"\u2064", "", lyric_lines[i + j].strip()
)
combined_lines.append(cleaned_line)
# Track position of this line in the combined text
line_start_pos = len(
" ".join(combined_text_parts).lower()
)
if line_start_pos > 0:
line_start_pos += (
1 # Account for space separator
)
line_positions.append((i + j, line_start_pos))
combined_text_parts.append(cleaned_line)
combined_text = " ".join(combined_lines).lower()
if search_term in combined_text:
# Find which specific line the match starts in
match_pos = combined_text.find(search_term)
# Find the line that contains the start of the match
actual_start_line = i # Default fallback
for line_idx, line_start_pos in line_positions:
if line_start_pos <= match_pos:
actual_start_line = line_idx
else:
break
seeked_found_line = actual_start_line
break
if seeked_found_line is not None:
break
if seeked_found_line is None:
return JSONResponse( return JSONResponse(
status_code=500,
content={ content={
"err": True, "err": True,
"errorText": "Seek (a.k.a. subsearch) failed.", "errorText": "Seek (a.k.a. subsearch) failed.",
"failed_seek": True, "failed_seek": True,
}, },
) )
result["lyrics"] = " / ".join(lyric_lines[seeked_found_line:]) # Only include lines strictly starting from the matched line
result["lyrics"] = separator.join(lyric_lines[seeked_found_line:])
result["confidence"] = int(result["confidence"]) result["confidence"] = int(result["confidence"])
result["time"] = f"{float(result['time']):.4f}" result["time"] = f"{float(result['time']):.4f}"

View File

@@ -307,7 +307,7 @@ class Radio(FastAPI):
} }
) )
async def album_art_handler( def album_art_handler(
self, request: Request, track_id: Optional[int] = None, self, request: Request, track_id: Optional[int] = None,
station: Station = "main" station: Station = "main"
) -> Response: ) -> Response:
@@ -364,13 +364,21 @@ class Radio(FastAPI):
ret_obj: dict = {**self.radio_util.now_playing[station]} ret_obj: dict = {**self.radio_util.now_playing[station]}
ret_obj["station"] = station ret_obj["station"] = station
try: try:
ret_obj["elapsed"] = int(time.time()) - ret_obj["start"] ret_obj["elapsed"] = int(time.time()) - ret_obj["start"] if ret_obj["start"] else 0
except KeyError: except KeyError:
traceback.print_exc() traceback.print_exc()
ret_obj["elapsed"] = 0 ret_obj["elapsed"] = 0
ret_obj.pop("file_path") ret_obj.pop("file_path")
return JSONResponse(content=ret_obj) return JSONResponse(content=ret_obj)
def _bg_cache_art(self, track_id: int, file_path: str):
try:
album_art = self.radio_util.get_album_art(track_id=track_id)
if not album_art:
self.radio_util.cache_album_art(track_id, file_path)
except Exception as e:
logging.error(f"Background art cache error: {e}")
async def radio_get_next( async def radio_get_next(
self, self,
data: ValidRadioNextRequest, data: ValidRadioNextRequest,
@@ -448,13 +456,7 @@ class Radio(FastAPI):
logging.info("radio_get_next Exception: %s", str(e)) logging.info("radio_get_next Exception: %s", str(e))
traceback.print_exc() traceback.print_exc()
try: background_tasks.add_task(self._bg_cache_art, next["id"], next["file_path"])
album_art = self.radio_util.get_album_art(track_id=next["id"])
if not album_art:
self.radio_util.cache_album_art(next["id"], next["file_path"])
except Exception as e:
logging.info("radio_get_next Exception: %s", str(e))
traceback.print_exc()
return JSONResponse(content=next) return JSONResponse(content=next)
@@ -496,8 +498,12 @@ class Radio(FastAPI):
}, },
) )
search: bool = self.radio_util.search_db( loop = asyncio.get_running_loop()
artistsong=artistsong, artist=artist, song=song, station=data.station search: bool = await loop.run_in_executor(
None,
lambda: self.radio_util.search_db(
artistsong=artistsong, artist=artist, song=song, station=data.station
)
) )
if data.alsoSkip: if data.alsoSkip:
await self.radio_util._ls_skip(data.station) await self.radio_util._ls_skip(data.station)
@@ -764,9 +770,24 @@ class Radio(FastAPI):
logging.info(f"[LRC] Starting fetch for {station}: {artist} - {title}") logging.info(f"[LRC] Starting fetch for {station}: {artist} - {title}")
# Try SR first with timeout # Try LRCLib first with timeout
try: try:
async with asyncio.timeout(5.0): # 5 second timeout async with asyncio.timeout(10.0): # 10 second timeout
logging.info("[LRC] Trying LRCLib")
lrclib_result = await self.lrclib.search(artist, title, plain=False, raw=True)
if lrclib_result and lrclib_result.lyrics and isinstance(lrclib_result.lyrics, str):
logging.info("[LRC] Found from LRCLib")
return lrclib_result.lyrics, "LRCLib"
except asyncio.TimeoutError:
logging.warning("[LRC] LRCLib fetch timed out")
except Exception as e:
logging.error(f"[LRC] LRCLib fetch error: {e}")
logging.info("[LRC] LRCLib fetch completed without results")
# Try SR as fallback with timeout
try:
async with asyncio.timeout(10.0): # 10 second timeout
lrc = await self.sr_util.get_lrc_by_artist_song( lrc = await self.sr_util.get_lrc_by_artist_song(
artist, title, duration=duration artist, title, duration=duration
) )
@@ -778,21 +799,6 @@ class Radio(FastAPI):
except Exception as e: except Exception as e:
logging.error(f"[LRC] SR fetch error: {e}") logging.error(f"[LRC] SR fetch error: {e}")
logging.info("[LRC] SR fetch completed without results")
# Try LRCLib as fallback with timeout
try:
async with asyncio.timeout(5.0): # 5 second timeout
logging.info("[LRC] Trying LRCLib fallback")
lrclib_result = await self.lrclib.search(artist, title, plain=False)
if lrclib_result and lrclib_result.lyrics and isinstance(lrclib_result.lyrics, str):
logging.info("[LRC] Found from LRCLib")
return lrclib_result.lyrics, "LRCLib"
except asyncio.TimeoutError:
logging.warning("[LRC] LRCLib fetch timed out")
except Exception as e:
logging.error(f"[LRC] LRCLib fetch error: {e}")
logging.info("[LRC] No lyrics found from any source") logging.info("[LRC] No lyrics found from any source")
return None, "None" return None, "None"
except Exception as e: except Exception as e:
@@ -804,11 +810,21 @@ class Radio(FastAPI):
try: try:
async with self.lrc_cache_locks[station]: async with self.lrc_cache_locks[station]:
self.lrc_cache.pop(station, None) self.lrc_cache.pop(station, None)
lrc, source = await self._fetch_and_cache_lrc(station, track_json)
if lrc: lrc, source = await self._fetch_and_cache_lrc(station, track_json)
self.lrc_cache[station] = lrc
async with self.lrc_cache_locks[station]:
# Verify we are still on the same song
current_track = self.radio_util.now_playing.get(station)
if current_track and current_track.get("uuid") == track_json.get("uuid"):
if lrc:
self.lrc_cache[station] = lrc
else:
self.lrc_cache[station] = None
else: else:
self.lrc_cache[station] = None logging.info(f"[LRC] Discarding fetch result for {station} as track changed.")
return
if lrc: if lrc:
await self.broadcast_lrc(station, lrc, source) await self.broadcast_lrc(station, lrc, source)
except Exception as e: except Exception as e:

View File

@@ -20,6 +20,9 @@ from lyric_search.sources import private
from typing import Literal from typing import Literal
from pydantic import BaseModel from pydantic import BaseModel
logger = logging.getLogger()
logger.setLevel(logging.DEBUG)
class ValidBulkFetchRequest(BaseModel): class ValidBulkFetchRequest(BaseModel):
track_ids: list[int] track_ids: list[int]

View File

@@ -1,171 +0,0 @@
import os
import aiosqlite as sqlite3
from fastapi import FastAPI, Depends, Response
from fastapi_throttle import RateLimiter
from fastapi.responses import JSONResponse
from typing import Optional, LiteralString, Union, Iterable, cast
from .constructors import ValidShowEpisodeLineRequest, ValidShowEpisodeListRequest
class Transcriptions(FastAPI):
"""
Transcription Endpoints
"""
def __init__(self, app: FastAPI, util, constants) -> None:
"""Initialize Transcriptions endpoints."""
self.app: FastAPI = app
self.util = util
self.constants = constants
self.endpoints: dict = {
"transcriptions/get_episodes": self.get_episodes_handler,
"transcriptions/get_episode_lines": self.get_episode_lines_handler,
# tbd
}
for endpoint, handler in self.endpoints.items():
app.add_api_route(
f"/{endpoint}",
handler,
methods=["POST"],
include_in_schema=True,
dependencies=[Depends(RateLimiter(times=2, seconds=2))],
)
async def get_episodes_handler(
self, data: ValidShowEpisodeListRequest
) -> JSONResponse:
"""
Get list of episodes by show ID.
Parameters:
- **data** (ValidShowEpisodeListRequest): Request containing show ID.
Returns:
- **JSONResponse**: Contains a list of episodes.
"""
show_id: int = data.s
db_path: Optional[Union[str, LiteralString]] = None
db_query: Optional[str] = None
show_title: Optional[str] = None
if not isinstance(show_id, int):
return JSONResponse(
status_code=500,
content={
"err": True,
"errorText": "Invalid request",
},
)
show_id = int(show_id)
if not (str(show_id).isnumeric()) or show_id not in [0, 1, 2]:
return JSONResponse(
status_code=500,
content={
"err": True,
"errorText": "Show not found.",
},
)
match show_id:
case 0:
db_path = os.path.join("/usr/local/share", "sqlite_dbs", "sp.db")
db_query = """SELECT DISTINCT(("S" || Season || "E" || Episode || " " || Title)), ID FROM SP_DAT ORDER BY Season, Episode"""
show_title = "South Park"
case 1:
db_path = os.path.join("/usr/local/share", "sqlite_dbs", "futur.db")
db_query = """SELECT DISTINCT(("S" || EP_S || "E" || EP_EP || " " || EP_TITLE)), EP_ID FROM clean_dialog ORDER BY EP_S, EP_EP"""
show_title = "Futurama"
case 2:
db_path = os.path.join("/usr/local/share", "sqlite_dbs", "parks.db")
db_query = """SELECT DISTINCT(("S" || EP_S || "E" || EP_EP || " " || EP_TITLE)), EP_ID FROM clean_dialog ORDER BY EP_S, EP_EP"""
show_title = "Parks And Rec"
case _:
return JSONResponse(
status_code=500,
content={
"err": True,
"errorText": "Unknown error.",
},
)
async with sqlite3.connect(database=db_path, timeout=1) as _db:
async with await _db.execute(db_query) as _cursor:
result: Iterable[sqlite3.Row] = await _cursor.fetchall()
return JSONResponse(
content={
"show_title": show_title,
"episodes": [
{
"id": item[1],
"ep_friendly": item[0],
}
for item in result
],
}
)
async def get_episode_lines_handler(
self, data: ValidShowEpisodeLineRequest
) -> Response:
"""
Get lines for a particular episode.
Parameters:
- **data** (ValidShowEpisodeLineRequest): Request containing show and episode ID.
Returns:
- **Response**: Episode lines.
"""
show_id: int = int(data.s)
episode_id: int = int(data.e)
match show_id:
case 0:
db_path: Union[str, LiteralString] = os.path.join(
"/usr/local/share", "sqlite_dbs", "sp.db"
)
db_query: str = """SELECT ("S" || Season || "E" || Episode || " " || Title), Character, Line FROM SP_DAT WHERE ID = ?"""
case 1:
db_path = os.path.join("/usr/local/share", "sqlite_dbs", "futur.db")
db_query = """SELECT ("S" || EP_S || "E" || EP_EP || " " || EP_TITLE || "<br><em>Opener: " || EP_OPENER || "</em>"), EP_LINE_SPEAKER, EP_LINE FROM clean_dialog WHERE EP_ID = ? ORDER BY LINE_ID ASC"""
case 2:
db_path = os.path.join("/usr/local/share", "sqlite_dbs", "parks.db")
db_query = """SELECT ("S" || EP_S || "E" || EP_EP || " " || EP_TITLE), EP_LINE_SPEAKER, EP_LINE FROM clean_dialog WHERE EP_ID = ? ORDER BY id ASC"""
case _:
return JSONResponse(
status_code=500,
content={
"err": True,
"errorText": "Unknown error",
},
)
async with sqlite3.connect(database=db_path, timeout=1) as _db:
params: tuple = (episode_id,)
async with await _db.execute(db_query, params) as _cursor:
result: Iterable[sqlite3.Row] = await _cursor.fetchall()
result_list = cast(list[sqlite3.Row], result)
if not result_list:
return Response(
status_code=404,
content="Not found",
)
first_result: sqlite3.Row = result_list[0]
return JSONResponse(
content={
"episode_id": episode_id,
"ep_friendly": first_result[0].strip(),
"lines": [
{
"speaker": item[1].strip(),
"line": item[2].strip(),
}
for item in result
],
}
)

112
lyric_search/models.py Normal file
View File

@@ -0,0 +1,112 @@
"""
Database models for LRCLib lyrics cache.
"""
import os
import urllib.parse
from typing import Type, AsyncGenerator
from sqlalchemy import (
Column,
Integer,
String,
Float,
Boolean,
DateTime,
ForeignKey,
UniqueConstraint,
)
from sqlalchemy.orm import relationship, foreign
from sqlalchemy.ext.declarative import declarative_base, DeclarativeMeta
from sqlalchemy.ext.asyncio import AsyncEngine, create_async_engine, AsyncSession
from sqlalchemy.ext.asyncio import async_sessionmaker
from dotenv import load_dotenv
load_dotenv()
Base: Type[DeclarativeMeta] = declarative_base()
class Tracks(Base): # type: ignore
"""Tracks table - stores track metadata."""
__tablename__ = "tracks"
id = Column(Integer, primary_key=True, autoincrement=True)
name = Column(String, index=True)
name_lower = Column(String, index=True)
artist_name = Column(String, index=True)
artist_name_lower = Column(String, index=True)
album_name = Column(String)
album_name_lower = Column(String, index=True)
duration = Column(Float, index=True)
last_lyrics_id = Column(Integer, ForeignKey("lyrics.id"), index=True)
created_at = Column(DateTime)
updated_at = Column(DateTime)
# Relationships
lyrics = relationship(
"Lyrics",
back_populates="track",
foreign_keys=[last_lyrics_id],
primaryjoin="Tracks.id == foreign(Lyrics.track_id)",
)
# Constraints
__table_args__ = (
UniqueConstraint(
"name_lower",
"artist_name_lower",
"album_name_lower",
"duration",
name="uq_tracks",
),
)
class Lyrics(Base): # type: ignore
"""Lyrics table - stores lyrics content."""
__tablename__ = "lyrics"
id = Column(Integer, primary_key=True, autoincrement=True)
plain_lyrics = Column(String)
synced_lyrics = Column(String)
track_id = Column(Integer, ForeignKey("tracks.id"), index=True)
has_plain_lyrics = Column(Boolean, index=True)
has_synced_lyrics = Column(Boolean, index=True)
instrumental = Column(Boolean)
source = Column(String, index=True)
created_at = Column(DateTime, index=True)
updated_at = Column(DateTime)
# Relationships
track = relationship(
"Tracks",
back_populates="lyrics",
foreign_keys=[track_id],
primaryjoin=(Tracks.id == foreign(track_id)),
remote_side=Tracks.id,
)
# PostgreSQL connection - using environment variables
POSTGRES_HOST = os.getenv("POSTGRES_HOST", "localhost")
POSTGRES_PORT = os.getenv("POSTGRES_PORT", "5432")
POSTGRES_DB = os.getenv("POSTGRES_DB", "lrclib")
POSTGRES_USER = os.getenv("POSTGRES_USER", "api")
POSTGRES_PASSWORD = os.getenv("POSTGRES_PASSWORD", "")
# URL-encode the password to handle special characters
encoded_password = urllib.parse.quote_plus(POSTGRES_PASSWORD)
DATABASE_URL: str = f"postgresql+asyncpg://{POSTGRES_USER}:{encoded_password}@{POSTGRES_HOST}:{POSTGRES_PORT}/{POSTGRES_DB}"
async_engine: AsyncEngine = create_async_engine(
DATABASE_URL, pool_size=20, max_overflow=10, pool_pre_ping=True, echo=False
)
AsyncSessionLocal = async_sessionmaker(bind=async_engine, expire_on_commit=False)
async def get_async_db():
"""Get async database session."""
async with AsyncSessionLocal() as session:
yield session

View File

@@ -14,9 +14,7 @@ class Aggregate:
Aggregate all source methods Aggregate all source methods
""" """
def __init__(self, exclude_methods=None) -> None: def __init__(self, exclude_methods: list = []) -> None:
if not exclude_methods:
exclude_methods: list = []
self.exclude_methods = exclude_methods self.exclude_methods = exclude_methods
self.redis_cache = redis_cache.RedisCache() self.redis_cache = redis_cache.RedisCache()
self.notifier = notifier.DiscordNotifier() self.notifier = notifier.DiscordNotifier()
@@ -70,14 +68,14 @@ class Aggregate:
if plain: # do not record LRC fails if plain: # do not record LRC fails
try: try:
await self.redis_cache.increment_found_count("failed") await self.redis_cache.increment_found_count("failed")
self.notifier.send( # await self.notifier.send(
"WARNING", # "WARNING",
f"Could not find {artist} - {song} via queried sources.", # f"Could not find {artist} - {song} via queried sources.",
) # )
except Exception as e: except Exception as e:
traceback.print_exc() traceback.print_exc()
logging.info("Could not increment redis failed counter: %s", str(e)) logging.info("Could not increment redis failed counter: %s", str(e))
self.notifier.send( await self.notifier.send(
f"ERROR @ {__file__.rsplit('/', maxsplit=1)[-1]}", f"ERROR @ {__file__.rsplit('/', maxsplit=1)[-1]}",
f"Could not increment redis failed counter: {str(e)}", f"Could not increment redis failed counter: {str(e)}",
) )

View File

@@ -343,15 +343,19 @@ class Cache:
) )
else: else:
best_match = (result_tracks[0], 100) best_match = (result_tracks[0], 100)
if not best_match or confidence < 90: if not best_match:
return None return None
(candidate, confidence) = best_match (candidate, confidence) = best_match
if confidence < 90:
return None
logging.info("Result found on %s", self.label) logging.info("Result found on %s", self.label)
matched = self.get_matched( matched = self.get_matched(
sqlite_rows=results, sqlite_rows=results,
matched_candidate=candidate, matched_candidate=candidate,
confidence=confidence, confidence=confidence,
) )
if matched is None:
return None
time_end: float = time.time() time_end: float = time.time()
time_diff: float = time_end - time_start time_diff: float = time_end - time_start
matched.time = time_diff matched.time = time_diff

View File

@@ -45,11 +45,11 @@ class Genius:
Optional[LyricsResult]: The result, if found - None otherwise. Optional[LyricsResult]: The result, if found - None otherwise.
""" """
try: try:
artist = artist.strip().lower() artist_name = artist.strip().lower()
song = song.strip().lower() song_name = song.strip().lower()
time_start: float = time.time() time_start: float = time.time()
logging.info("Searching %s - %s on %s", artist, song, self.label) logging.info("Searching %s - %s on %s", artist_name, song_name, self.label)
search_term: str = f"{artist}%20{song}" search_term: str = f"{artist_name}%20{song_name}"
returned_lyrics: str = "" returned_lyrics: str = ""
async with ClientSession() as client: async with ClientSession() as client:
async with client.get( async with client.get(
@@ -100,10 +100,13 @@ class Genius:
) )
for returned in possible_matches for returned in possible_matches
] ]
searched: str = f"{artist} - {song}" searched: str = f"{artist_name} - {song_name}"
best_match: tuple = self.matcher.find_best_match( best_match: Optional[tuple] = self.matcher.find_best_match(
input_track=searched, candidate_tracks=to_scrape input_track=searched, candidate_tracks=to_scrape
) )
if not best_match:
raise InvalidGeniusResponseException("No matching result")
logging.info("To scrape: %s", to_scrape) logging.info("To scrape: %s", to_scrape)
((scrape_stub, track), confidence) = best_match ((scrape_stub, track), confidence) = best_match
scrape_url: str = f"{self.genius_url}{scrape_stub[1:]}" scrape_url: str = f"{self.genius_url}{scrape_stub[1:]}"
@@ -157,8 +160,8 @@ class Genius:
returned_lyrics: str = self.datautils.scrub_lyrics( returned_lyrics: str = self.datautils.scrub_lyrics(
returned_lyrics returned_lyrics
) )
artist: str = track.split(" - ", maxsplit=1)[0] artist = track.split(" - ", maxsplit=1)[0]
song: str = track.split(" - ", maxsplit=1)[1] song = track.split(" - ", maxsplit=1)[1]
logging.info("Result found on %s", self.label) logging.info("Result found on %s", self.label)
time_end: float = time.time() time_end: float = time.time()
time_diff: float = time_end - time_start time_diff: float = time_end - time_start

View File

@@ -1,45 +1,41 @@
import time import time
import traceback
import logging import logging
from typing import Optional, Union from typing import Optional
from aiohttp import ClientTimeout, ClientSession from sqlalchemy.future import select
from tenacity import retry, stop_after_attempt, wait_fixed
from lyric_search import utils from lyric_search import utils
from lyric_search.constructors import LyricsResult from lyric_search.constructors import LyricsResult
from . import common, cache, redis_cache from lyric_search.models import Tracks, Lyrics, AsyncSessionLocal
from lyric_search.constructors import InvalidLRCLibResponseException from . import redis_cache
logger = logging.getLogger() logger = logging.getLogger()
log_level = logging.getLevelName(logger.level) log_level = logging.getLevelName(logger.level)
class LRCLib: class LRCLib:
"""LRCLib Search Module""" """LRCLib Search Module - Local PostgreSQL Database"""
def __init__(self) -> None: def __init__(self) -> None:
self.label: str = "LRCLib" self.label: str = "LRCLib-Cache"
self.lrclib_url: str = "https://lrclib.net/api/search"
self.headers: dict = common.SCRAPE_HEADERS
self.timeout = ClientTimeout(connect=3, sock_read=8)
self.datautils = utils.DataUtils() self.datautils = utils.DataUtils()
self.matcher = utils.TrackMatcher() self.matcher = utils.TrackMatcher()
self.cache = cache.Cache()
self.redis_cache = redis_cache.RedisCache() self.redis_cache = redis_cache.RedisCache()
@retry(stop=stop_after_attempt(2), wait=wait_fixed(0.5))
async def search( async def search(
self, self,
artist: str, artist: str,
song: str, song: str,
plain: Optional[bool] = True, plain: Optional[bool] = True,
duration: Optional[int] = None, duration: Optional[int] = None,
raw: bool = False,
) -> Optional[LyricsResult]: ) -> Optional[LyricsResult]:
""" """
LRCLib Search LRCLib Local Database Search
Args: Args:
artist (str): the artist to search artist (str): the artist to search
song (str): the song to search song (str): the song to search
plain (bool): return plain lyrics (True) or synced lyrics (False)
duration (int): optional track duration for better matching
raw (bool): return raw LRC string instead of parsed object (only for synced)
Returns: Returns:
Optional[LyricsResult]: The result, if found - None otherwise. Optional[LyricsResult]: The result, if found - None otherwise.
""" """
@@ -47,140 +43,117 @@ class LRCLib:
artist = artist.strip().lower() artist = artist.strip().lower()
song = song.strip().lower() song = song.strip().lower()
time_start: float = time.time() time_start: float = time.time()
lrc_obj: Optional[list[dict]] = None
logging.info("Searching %s - %s on %s", artist, song, self.label) logging.info("Searching %s - %s on %s", artist, song, self.label)
input_track: str = f"{artist} - {song}" async with AsyncSessionLocal() as db:
returned_lyrics: str = "" best_match = None
async with ClientSession() as client:
async with await client.get(
self.lrclib_url,
params={
"artist_name": artist,
"track_name": song,
**({"duration": duration} if duration else {}),
},
timeout=self.timeout,
headers=self.headers,
) as request:
request.raise_for_status()
text: Optional[str] = await request.text() # Try exact match first (fastest)
if not text: result = await db.execute(
raise InvalidLRCLibResponseException("No search response.") select(
if len(text) < 100: Tracks.artist_name,
raise InvalidLRCLibResponseException( Tracks.name,
"Search response text was invalid (len < 100 chars.)" Lyrics.plain_lyrics,
Lyrics.synced_lyrics,
)
.join(Lyrics, Tracks.id == Lyrics.track_id)
.filter(
Tracks.artist_name_lower == artist,
Tracks.name_lower == song,
)
.limit(1)
)
best_match = result.first()
# If no exact match, try prefix match (faster than full ILIKE)
if not best_match:
result = await db.execute(
select(
Tracks.artist_name,
Tracks.name,
Lyrics.plain_lyrics,
Lyrics.synced_lyrics,
) )
.join(Lyrics, Tracks.id == Lyrics.track_id)
.filter(
Tracks.artist_name_lower.like(f"{artist}%"),
Tracks.name_lower.like(f"{song}%"),
)
.limit(1)
)
best_match = result.first()
search_data: Optional[Union[list, dict]] = await request.json() # If still no match, try full ILIKE (slowest)
if not isinstance(search_data, list | dict): if not best_match:
raise InvalidLRCLibResponseException("No JSON search data.") result = await db.execute(
select(
Tracks.artist_name,
Tracks.name,
Lyrics.plain_lyrics,
Lyrics.synced_lyrics,
)
.join(Lyrics, Tracks.id == Lyrics.track_id)
.filter(
Tracks.artist_name_lower.ilike(f"%{artist}%"),
Tracks.name_lower.ilike(f"%{song}%"),
)
.limit(1)
)
best_match = result.first()
# logging.info("Search Data:\n%s", search_data) if not best_match:
logging.info("No result found on %s", self.label)
return None
if not isinstance(search_data, list): returned_artist = best_match.artist_name
raise InvalidLRCLibResponseException("Invalid JSON.") returned_song = best_match.name
# Filter by duration if provided if plain:
if duration: if not best_match.plain_lyrics:
search_data = [ logging.info("No plain lyrics available on %s", self.label)
r return None
for r in search_data returned_lyrics = best_match.plain_lyrics
if abs(r.get("duration", 0) - duration) <= 10 returned_lyrics = self.datautils.scrub_lyrics(returned_lyrics)
] lrc_obj = None
else:
if plain: if not best_match.synced_lyrics:
possible_matches = [ logging.info("No synced lyrics available on %s", self.label)
( return None
x, returned_lyrics = best_match.synced_lyrics
f"{result.get('artistName')} - {result.get('trackName')}", if raw:
) lrc_obj = returned_lyrics
for x, result in enumerate(search_data)
]
else: else:
logging.info(
"Limiting possible matches to only those with non-null syncedLyrics"
)
possible_matches = [
(
x,
f"{result.get('artistName')} - {result.get('trackName')}",
)
for x, result in enumerate(search_data)
if isinstance(result["syncedLyrics"], str)
]
best_match = None
try:
match_result = self.matcher.find_best_match(
input_track,
possible_matches, # type: ignore
)
if match_result:
best_match = match_result[0]
except: # noqa
pass
if not best_match:
return
best_match_id = best_match[0]
if not isinstance(search_data[best_match_id]["artistName"], str):
raise InvalidLRCLibResponseException(
f"Invalid JSON: Cannot find artistName key.\n{search_data}"
)
if not isinstance(search_data[best_match_id]["trackName"], str):
raise InvalidLRCLibResponseException(
f"Invalid JSON: Cannot find trackName key.\n{search_data}"
)
returned_artist: str = search_data[best_match_id]["artistName"]
returned_song: str = search_data[best_match_id]["trackName"]
if plain:
if not isinstance(
search_data[best_match_id]["plainLyrics"], str
):
raise InvalidLRCLibResponseException(
f"Invalid JSON: Cannot find plainLyrics key.\n{search_data}"
)
returned_lyrics: str = search_data[best_match_id]["plainLyrics"]
returned_lyrics = self.datautils.scrub_lyrics(returned_lyrics)
else:
if not isinstance(
search_data[best_match_id]["syncedLyrics"], str
):
raise InvalidLRCLibResponseException(
f"Invalid JSON: Cannot find syncedLyrics key.\n{search_data}"
)
returned_lyrics: str = search_data[best_match_id][
"syncedLyrics"
]
lrc_obj = self.datautils.create_lrc_object(returned_lyrics) lrc_obj = self.datautils.create_lrc_object(returned_lyrics)
returned_track: str = f"{returned_artist} - {returned_song}"
match_result = self.matcher.find_best_match( # Calculate match confidence
input_track=input_track, candidate_tracks=[(0, returned_track)] input_track = f"{artist} - {song}"
) returned_track = f"{returned_artist} - {returned_song}"
if not match_result: match_result = self.matcher.find_best_match(
return # No suitable match found input_track=input_track, candidate_tracks=[(0, returned_track)]
_matched, confidence = match_result )
logging.info("Result found on %s", self.label)
time_end: float = time.time() if not match_result:
time_diff: float = time_end - time_start return None
matched = LyricsResult(
artist=returned_artist, _matched, confidence = match_result
song=returned_song,
src=self.label, logging.info("Result found on %s", self.label)
lyrics=returned_lyrics if plain else lrc_obj, # type: ignore time_end = time.time()
confidence=confidence, time_diff = time_end - time_start
time=time_diff,
) matched = LyricsResult(
await self.redis_cache.increment_found_count(self.label) artist=returned_artist,
if plain: song=returned_song,
await self.cache.store(matched) src=self.label,
return matched lyrics=returned_lyrics if plain else lrc_obj, # type: ignore
confidence=confidence,
time=time_diff,
)
await self.redis_cache.increment_found_count(self.label)
return matched
except Exception as e: except Exception as e:
logging.debug("Exception: %s", str(e)) logging.error("Exception in %s: %s", self.label, str(e))
traceback.print_exc() return None

View File

@@ -13,7 +13,7 @@ from lyric_search import notifier
from lyric_search.constructors import LyricsResult from lyric_search.constructors import LyricsResult
import redis.asyncio as redis import redis.asyncio as redis
from redis.commands.search.query import Query # type: ignore from redis.commands.search.query import Query # type: ignore
from redis.commands.search.indexDefinition import IndexDefinition, IndexType # type: ignore from redis.commands.search.index_definition import IndexDefinition, IndexType # type: ignore
from redis.commands.search.field import TextField, Field # type: ignore from redis.commands.search.field import TextField, Field # type: ignore
from redis.commands.json.path import Path # type: ignore from redis.commands.json.path import Path # type: ignore
from . import private from . import private

View File

@@ -17,7 +17,7 @@ from rapidfuzz import fuzz
from endpoints.constructors import RadioException from endpoints.constructors import RadioException
import redis.asyncio as redis import redis.asyncio as redis
from redis.commands.search.query import Query # noqa from redis.commands.search.query import Query # noqa
from redis.commands.search.indexDefinition import IndexDefinition, IndexType # noqa from redis.commands.search.index_definition import IndexDefinition, IndexType # noqa
from redis.commands.search.field import TextField # noqa from redis.commands.search.field import TextField # noqa
from redis.commands.json.path import Path # noqa from redis.commands.json.path import Path # noqa
from lyric_search.sources import private from lyric_search.sources import private
@@ -339,7 +339,10 @@ class RadioUtil:
time_start: float = time.time() time_start: float = time.time()
artist_genre: dict[str, str] = {} artist_genre: dict[str, str] = {}
query: str = ( query: str = (
"SELECT genre FROM artist_genre WHERE artist LIKE ? COLLATE NOCASE" "SELECT REPLACE(GROUP_CONCAT(DISTINCT g.name), ',', ', ') AS genre FROM artists a "
"JOIN artist_genres ag ON a.id = ag.artist_id "
"JOIN genres g ON ag.genre_id = g.id "
"WHERE a.name LIKE ? COLLATE NOCASE"
) )
with sqlite3.connect(self.artist_genre_db_path) as _db: with sqlite3.connect(self.artist_genre_db_path) as _db:
_db.row_factory = sqlite3.Row _db.row_factory = sqlite3.Row
@@ -347,7 +350,7 @@ class RadioUtil:
params: tuple[str] = (f"%%{artist}%%",) params: tuple[str] = (f"%%{artist}%%",)
_cursor = _db.execute(query, params) _cursor = _db.execute(query, params)
res = _cursor.fetchone() res = _cursor.fetchone()
if not res: if not res or not res["genre"]:
artist_genre[artist] = "N/A" artist_genre[artist] = "N/A"
continue continue
artist_genre[artist] = res["genre"] artist_genre[artist] = res["genre"]
@@ -367,14 +370,17 @@ class RadioUtil:
try: try:
artist = artist.strip() artist = artist.strip()
query: str = ( query: str = (
"SELECT genre FROM artist_genre WHERE artist LIKE ? COLLATE NOCASE" "SELECT REPLACE(GROUP_CONCAT(DISTINCT g.name), ',', ', ') AS genre FROM artists a "
"JOIN artist_genres ag ON a.id = ag.artist_id "
"JOIN genres g ON ag.genre_id = g.id "
"WHERE a.name LIKE ? COLLATE NOCASE"
) )
params: tuple[str] = (artist,) params: tuple[str] = (artist,)
with sqlite3.connect(self.playback_db_path, timeout=2) as _db: with sqlite3.connect(self.playback_db_path, timeout=2) as _db:
_db.row_factory = sqlite3.Row _db.row_factory = sqlite3.Row
_cursor = _db.execute(query, params) _cursor = _db.execute(query, params)
res = _cursor.fetchone() res = _cursor.fetchone()
if not res: if not res or not res["genre"]:
return "Not Found" # Exception suppressed return "Not Found" # Exception suppressed
# raise RadioException( # raise RadioException(
# f"Could not locate {artist} in artist_genre_map db." # f"Could not locate {artist} in artist_genre_map db."
@@ -480,18 +486,18 @@ class RadioUtil:
) )
"""Loading Complete""" """Loading Complete"""
self.playlists_loaded = True
# Request skip from LS to bring streams current # Request skip from LS to bring streams current
for playlist in self.playlists: for playlist in self.playlists:
logging.info("Skipping: %s", playlist) logging.info("Skipping: %s", playlist)
await self._ls_skip(playlist) await self._ls_skip(playlist)
self.playlists_loaded = True
except Exception as e: except Exception as e:
logging.info("Playlist load failed: %s", str(e)) logging.info("Playlist load failed: %s", str(e))
traceback.print_exc() traceback.print_exc()
def cache_album_art(self, track_id: int, file_path: str) -> None: def cache_album_art(self, track_id: int, file_path: str) -> None:
""" """
Cache Album Art to SQLite DB Cache Album Art to SQLite DB - IMPROVED VERSION
Args: Args:
track_id (int): Track ID to update track_id (int): Track ID to update
file_path (str): Path to file, for artwork extraction file_path (str): Path to file, for artwork extraction
@@ -499,30 +505,92 @@ class RadioUtil:
None None
""" """
try: try:
logging.info( # Validate file exists first
"cache_album_art: Attempting to store album art for track_id: %s", if not os.path.exists(file_path):
track_id, logging.warning("cache_album_art: File not found: %s", file_path)
) return
tagger = music_tag.load_file(file_path)
album_art = tagger["artwork"].first.data if tagger else None logging.info("cache_album_art: Attempting to store album art for track_id: %s", track_id)
with sqlite3.connect(self.album_art_db_path, timeout=2) as db_conn:
db_cursor = db_conn.execute( # Check if artwork already exists to avoid duplicates
"INSERT OR IGNORE INTO album_art (track_id, album_art) VALUES(?, ?)", with sqlite3.connect(self.album_art_db_path, timeout=5) as db_conn:
( db_conn.row_factory = sqlite3.Row
track_id, cursor = db_conn.execute("SELECT track_id FROM album_art WHERE track_id = ?", (track_id,))
album_art, if cursor.fetchone():
), logging.debug("cache_album_art: Track %s already has album art", track_id)
) return
if isinstance(db_cursor.lastrowid, int):
db_conn.commit() # Load file with better error handling
try:
tagger = music_tag.load_file(file_path)
except Exception as e:
logging.warning("cache_album_art: Failed to load file %s: %s", file_path, e)
return
# Extract artwork with validation
album_art = None
try:
if not tagger:
logging.debug("cache_album_art: No tagger available for track %s", track_id)
return
artwork_field = tagger["artwork"]
if artwork_field and hasattr(artwork_field, 'first') and artwork_field.first:
first_artwork = artwork_field.first
if hasattr(first_artwork, 'data') and first_artwork.data:
potential_art = first_artwork.data
# Validate artwork data
if isinstance(potential_art, bytes) and len(potential_art) > 100:
# Check if it looks like valid image data
if (potential_art.startswith(b'\xff\xd8') or # JPEG
potential_art.startswith(b'\x89PNG') or # PNG
potential_art.startswith(b'GIF87a') or # GIF87a
potential_art.startswith(b'GIF89a') or # GIF89a
potential_art.startswith(b'RIFF')): # WEBP/other RIFF
album_art = potential_art
logging.debug("cache_album_art: Found valid artwork (%s bytes)", len(album_art))
else:
logging.warning("cache_album_art: Invalid artwork format for track %s - not caching", track_id)
return
else:
logging.debug("cache_album_art: No valid artwork data for track %s", track_id)
return
else:
logging.debug("cache_album_art: No artwork data available for track %s", track_id)
return
else: else:
logging.debug( logging.debug("cache_album_art: No artwork field for track %s", track_id)
"No row inserted for track_id: %s w/ file_path: %s", return
track_id,
file_path, except Exception as e:
logging.warning("cache_album_art: Error extracting artwork for track %s: %s", track_id, e)
return
# Only proceed if we have valid artwork
if not album_art:
logging.debug("cache_album_art: No valid artwork to cache for track %s", track_id)
return
# Insert into database
try:
with sqlite3.connect(self.album_art_db_path, timeout=5) as db_conn:
cursor = db_conn.execute(
"INSERT OR IGNORE INTO album_art (track_id, album_art) VALUES (?, ?)",
(track_id, album_art)
) )
if cursor.rowcount == 1:
db_conn.commit()
logging.info("cache_album_art: Successfully cached %s bytes for track %s", len(album_art), track_id)
else:
logging.debug("cache_album_art: No row inserted for track_id: %s (may already exist)", track_id)
except Exception as e:
logging.error("cache_album_art: Database error for track %s: %s", track_id, e)
except Exception as e: except Exception as e:
logging.debug("cache_album_art Exception: %s", str(e)) logging.error("cache_album_art: Unexpected error for track %s: %s", track_id, e)
traceback.print_exc() traceback.print_exc()
def get_album_art(self, track_id: int) -> Optional[bytes]: def get_album_art(self, track_id: int) -> Optional[bytes]:

View File

@@ -45,6 +45,29 @@ load_dotenv()
sr = SRUtil() sr = SRUtil()
logger = logging.getLogger(__name__)
async def check_flac_stream(file_path):
"""Check if the given file contains a FLAC stream using ffprobe."""
cmd = [
"ffprobe",
"-v",
"error",
"-select_streams",
"a:0",
"-show_entries",
"stream=codec_name",
"-of",
"default=noprint_wrappers=1:nokey=1",
file_path,
]
process = await asyncio.create_subprocess_exec(
*cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE
)
stdout, _ = await process.communicate()
return b"flac" in stdout
# ---------- Discord helper ---------- # ---------- Discord helper ----------
async def discord_notify( async def discord_notify(
@@ -259,13 +282,16 @@ def bulk_download(track_list: list, quality: str = "FLAC"):
) )
) )
async def process_tracks(): async def process_tracks(track_list):
per_track_meta = [] per_track_meta = []
all_final_files = [] all_final_files = []
all_artists = set() all_artists = set()
(ROOT_DIR / "completed").mkdir(parents=True, exist_ok=True) (ROOT_DIR / "completed").mkdir(parents=True, exist_ok=True)
# Ensure aiohttp session is properly closed
async with aiohttp.ClientSession(headers=HEADERS) as session: async with aiohttp.ClientSession(headers=HEADERS) as session:
print(f"DEBUG: Starting process_tracks with {len(track_list)} tracks")
# Set up a one-time rate-limit callback to notify on the first 429 seen by SRUtil # Set up a one-time rate-limit callback to notify on the first 429 seen by SRUtil
async def _rate_limit_notify(exc: Exception): async def _rate_limit_notify(exc: Exception):
try: try:
@@ -285,6 +311,7 @@ def bulk_download(track_list: list, quality: str = "FLAC"):
pass pass
total = len(track_list or []) total = len(track_list or [])
for i, track_id in enumerate(track_list or []): for i, track_id in enumerate(track_list or []):
print(f"DEBUG: Processing track {i + 1}/{total}: {track_id}")
track_info = { track_info = {
"track_id": str(track_id), "track_id": str(track_id),
"status": "Pending", "status": "Pending",
@@ -300,31 +327,53 @@ def bulk_download(track_list: list, quality: str = "FLAC"):
track_info["attempts"] = attempt track_info["attempts"] = attempt
try: try:
sr.get_cover_by_album_id print(f"DEBUG: Getting downloadable for track {track_id}")
url = await sr.get_stream_url_by_track_id(track_id, quality) # Fetch downloadable (handles DASH and others)
if not url: downloadable = await sr._safe_api_call(
raise RuntimeError("No stream URL") sr.streamrip_client.get_downloadable,
str(track_id),
2 if quality == "FLAC" else 1,
retries=3,
)
parsed = urlparse(url) print(f"DEBUG: Got downloadable: {type(downloadable)}")
clean_path = unquote(parsed.path) if not downloadable:
ext = Path(clean_path).suffix or ".mp3" raise RuntimeError("No downloadable created")
ext = f".{downloadable.extension}"
tmp_file = Path(f"/tmp/{uuid.uuid4().hex}{ext}") tmp_file = Path(f"/tmp/{uuid.uuid4().hex}{ext}")
async with session.get(url) as resp: print(f"DEBUG: Starting download to {tmp_file}")
resp.raise_for_status() # Download
with open(tmp_file, "wb") as f: print(f"TRACK {track_id}: Starting download")
async for chunk in resp.content.iter_chunked(64 * 1024): try:
f.write(chunk) await downloadable._download(
str(tmp_file), callback=lambda x=None: None
)
print(
f"TRACK {track_id}: Download method completed normally"
)
except Exception as download_e:
print(
f"TRACK {track_id}: Download threw exception: {download_e}"
)
raise
print(
f"DEBUG: Download completed, file exists: {tmp_file.exists()}"
)
if not tmp_file.exists():
raise RuntimeError(
f"Download completed but no file created: {tmp_file}"
)
print(f"DEBUG: Fetching metadata for track {track_id}")
# Metadata fetch
try: try:
md = await sr.get_metadata_by_track_id(track_id) or {} md = await sr.get_metadata_by_track_id(track_id) or {}
print(f"DEBUG: Metadata fetched: {bool(md)}")
except MetadataFetchError as me: except MetadataFetchError as me:
# Permanent metadata failure — notify and continue (mark track failed) # Permanent metadata failure — mark failed and break
msg = f"Metadata permanently failed for track {track_id}: {me}"
try:
send_log_to_discord(msg, "ERROR", target)
except Exception:
pass
track_info["status"] = "Failed" track_info["status"] = "Failed"
track_info["error"] = str(me) track_info["error"] = str(me)
per_track_meta.append(track_info) per_track_meta.append(track_info)
@@ -333,6 +382,7 @@ def bulk_download(track_list: list, quality: str = "FLAC"):
job.meta["progress"] = int(((i + 1) / total) * 100) job.meta["progress"] = int(((i + 1) / total) * 100)
job.save_meta() job.save_meta()
break break
artist_raw = md.get("artist") or "Unknown Artist" artist_raw = md.get("artist") or "Unknown Artist"
album_raw = md.get("album") or "Unknown Album" album_raw = md.get("album") or "Unknown Album"
title_raw = md.get("title") or f"Track {track_id}" title_raw = md.get("title") or f"Track {track_id}"
@@ -341,15 +391,19 @@ def bulk_download(track_list: list, quality: str = "FLAC"):
album = sanitize_filename(album_raw) album = sanitize_filename(album_raw)
title = sanitize_filename(title_raw) title = sanitize_filename(title_raw)
print(f"TRACK {track_id}: Processing '{title}' by {artist}")
all_artists.add(artist) all_artists.add(artist)
album_dir = staging_root / artist / album album_dir = staging_root / artist / album
album_dir.mkdir(parents=True, exist_ok=True) album_dir.mkdir(parents=True, exist_ok=True)
final_file = ensure_unique_path(album_dir / f"{title}{ext}") final_file = ensure_unique_path(album_dir / f"{title}{ext}")
# Move file into final location first (tags will be updated on moved file) # Move to final location
print(f"TRACK {track_id}: Moving to final location...")
tmp_file.rename(final_file) tmp_file.rename(final_file)
print(f"TRACK {track_id}: File moved successfully")
# Try to fetch cover art via SRUtil (use album_id from metadata) # Fetch cover art
try: try:
album_field = md.get("album") album_field = md.get("album")
album_id = md.get("album_id") or ( album_id = md.get("album_id") or (
@@ -370,49 +424,46 @@ def bulk_download(track_list: list, quality: str = "FLAC"):
else: else:
cover_url = md.get("cover_url") cover_url = md.get("cover_url")
# Embed tags + artwork using music_tag if available, falling back to mediafile tagging # Embed tags
embedded = False embedded = False
try: img_bytes = None
if cover_url: if cover_url:
try:
timeout = aiohttp.ClientTimeout(total=15)
async with session.get(
cover_url, timeout=timeout
) as img_resp:
if img_resp.status == 200:
img_bytes = await img_resp.read()
else:
img_bytes = None
# Notify Discord about failed cover download (HTTP error)
try:
send_log_to_discord(
f"Cover download HTTP `{img_resp.status}` for track `{track_id} album_id={album_id} url={cover_url} artist={artist} album={album}`",
"WARNING",
target,
)
except Exception:
pass
except Exception as e:
img_bytes = None
# Notify Discord about exception during cover download
try:
send_log_to_discord(
f"Cover download exception for track `{track_id} album_id={album_id} url={cover_url} artist={artist} album={album}`: `{e}`",
"WARNING",
target,
)
except Exception:
pass
else:
img_bytes = None
# Prefer music_tag if available (keeps compatibility with add_cover_art.py)
try: try:
from music_tag import load_file as mt_load_file # type: ignore timeout = aiohttp.ClientTimeout(total=15)
async with session.get(
cover_url, timeout=timeout
) as img_resp:
if img_resp.status == 200:
img_bytes = await img_resp.read()
else:
img_bytes = None
try:
send_log_to_discord(
f"Cover download HTTP `{img_resp.status}` for track `{track_id} album_id={album_id} url={cover_url} artist={artist} album={album}`",
"WARNING",
target,
)
except Exception:
pass
except Exception as e:
img_bytes = None
try: try:
mf = mt_load_file(str(final_file)) send_log_to_discord(
# set basic tags f"Cover download exception for track `{track_id} album_id={album_id} url={cover_url} artist={artist} album={album}`: `{e}`",
"WARNING",
target,
)
except Exception:
pass
# Try music_tag first
try:
from music_tag import load_file as mt_load_file # type: ignore
# Add validation for `mf` object
try:
mf = mt_load_file(str(final_file))
if mf is not None:
if md.get("title"): if md.get("title"):
mf["title"] = md.get("title") mf["title"] = md.get("title")
if md.get("artist"): if md.get("artist"):
@@ -429,37 +480,40 @@ def bulk_download(track_list: list, quality: str = "FLAC"):
mf["artwork"] = img_bytes mf["artwork"] = img_bytes
mf.save() mf.save()
embedded = True embedded = True
except Exception: else:
logger.error("Failed to load file with music_tag.")
embedded = False embedded = False
except Exception: except Exception:
embedded = False embedded = False
# If music_tag not available or failed, fallback to mediafile tagging
if not embedded:
# If we had a cover_url but no bytes, log a warning to Discord
try:
if cover_url and not img_bytes:
send_log_to_discord(
f"Cover art not available for track {track_id} album_id={album_id} url={cover_url}",
"WARNING",
target,
)
except Exception:
pass
tag_with_mediafile(str(final_file), md)
except Exception: except Exception:
# Ensure at least the basic tags are written embedded = False
if not embedded:
try:
if cover_url and not img_bytes:
send_log_to_discord(
f"Cover art not available for track {track_id} album_id={album_id} url={cover_url}",
"WARNING",
target,
)
except Exception:
pass
try: try:
tag_with_mediafile(str(final_file), md) tag_with_mediafile(str(final_file), md)
except Exception: except Exception:
pass pass
tmp_file = None
# Success
tmp_file = None
track_info["status"] = "Success" track_info["status"] = "Success"
track_info["file_path"] = str(final_file) track_info["file_path"] = str(final_file)
track_info["error"] = None track_info["error"] = None
all_final_files.append(final_file) all_final_files.append(final_file)
print(
f"TRACK {track_id}: SUCCESS! Progress: {((i + 1) / total) * 100:.0f}%"
)
if job: if job:
job.meta["progress"] = int(((i + 1) / total) * 100) job.meta["progress"] = int(((i + 1) / total) * 100)
job.meta["tracks"] = per_track_meta + [track_info] job.meta["tracks"] = per_track_meta + [track_info]
@@ -469,7 +523,7 @@ def bulk_download(track_list: list, quality: str = "FLAC"):
except aiohttp.ClientResponseError as e: except aiohttp.ClientResponseError as e:
msg = f"Track {track_id} attempt {attempt} ClientResponseError: {e}" msg = f"Track {track_id} attempt {attempt} ClientResponseError: {e}"
send_log_to_discord(msg, "WARNING", target) send_log_to_discord(msg, "WARNING", target)
if e.status == 429: if getattr(e, "status", None) == 429:
wait_time = min(60, 2**attempt) wait_time = min(60, 2**attempt)
await asyncio.sleep(wait_time) await asyncio.sleep(wait_time)
else: else:
@@ -662,7 +716,7 @@ def bulk_download(track_list: list, quality: str = "FLAC"):
loop = asyncio.new_event_loop() loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop) asyncio.set_event_loop(loop)
try: try:
return loop.run_until_complete(process_tracks()) return loop.run_until_complete(process_tracks(track_list))
except Exception as e: except Exception as e:
send_log_to_discord( send_log_to_discord(
f"bulk_download failed: {e}\n{traceback.format_exc()}", "CRITICAL", target f"bulk_download failed: {e}\n{traceback.format_exc()}", "CRITICAL", target
@@ -672,3 +726,13 @@ def bulk_download(track_list: list, quality: str = "FLAC"):
job.save_meta() job.save_meta()
finally: finally:
loop.close() loop.close()
# Correct integration of FLAC stream check
async def process_tracks(track_list):
for i, track_id in enumerate(track_list or []):
combined_path = f"/tmp/{uuid.uuid4().hex}_combined.m4s" # Example path
if not await check_flac_stream(combined_path):
logger.error(f"No FLAC stream found in {combined_path}. Skipping file.")
continue
# Proceed with decoding pipeline

View File

@@ -19,15 +19,21 @@ class MetadataFetchError(Exception):
"""Raised when metadata fetch permanently fails after retries.""" """Raised when metadata fetch permanently fails after retries."""
# Suppress all logging output from this module and its children # Suppress noisy logging from this module and from the `streamrip` library
for name in [__name__, "utils.sr_wrapper"]: # We set propagate=False so messages don't bubble up to the root logger and
# attach a NullHandler where appropriate to avoid "No handler found" warnings.
for name in [__name__, "utils.sr_wrapper", "streamrip", "streamrip.client"]:
logger = logging.getLogger(name) logger = logging.getLogger(name)
logger.setLevel(logging.INFO) # Temporarily set to INFO for debugging LRC # Keep default level (or raise to WARNING) so non-important logs are dropped
try:
logger.setLevel(logging.WARNING)
except Exception:
pass
logger.propagate = False logger.propagate = False
for handler in logger.handlers: # Ensure a NullHandler is present so logs don't propagate and no missing-handler
handler.setLevel(logging.INFO) # warnings are printed when the package emits records.
# Also set the root logger to CRITICAL as a last resort (may affect global logging) if not any(isinstance(h, logging.NullHandler) for h in logger.handlers):
# logging.getLogger().setLevel(logging.CRITICAL) logger.addHandler(logging.NullHandler())
load_dotenv() load_dotenv()
@@ -190,12 +196,23 @@ class SRUtil:
title_match = self.is_fuzzy_match(expected_title, found_title, threshold) title_match = self.is_fuzzy_match(expected_title, found_title, threshold)
return artist_match and album_match and title_match return artist_match and album_match and title_match
def dedupe_by_key(self, key: str, entries: list[dict]) -> list[dict]: def dedupe_by_key(
deduped = {} self, key: str | list[str], entries: list[dict]
) -> list[dict]:
"""Return entries de-duplicated by one or more keys."""
keys = [key] if isinstance(key, str) else list(key)
if not keys:
return entries
def normalize(value: Any) -> str:
return str(value or "").strip().lower()
deduped: dict[tuple[str, ...], dict] = {}
for entry in entries: for entry in entries:
norm = entry[key].strip().lower() composite_key = tuple(normalize(entry.get(k)) for k in keys)
if norm not in deduped: if composite_key not in deduped:
deduped[norm] = entry deduped[composite_key] = entry
return list(deduped.values()) return list(deduped.values())
def group_artists_by_name( def group_artists_by_name(
@@ -444,7 +461,7 @@ class SRUtil:
return None return None
if not metadata: if not metadata:
return None return None
albums = self.dedupe_by_key("title", metadata.get("albums", [])) albums = self.dedupe_by_key(["title", "releaseDate"], metadata.get("albums", []))
albums_out = [ albums_out = [
{ {
"artist": ", ".join(artist["name"] for artist in album["artists"]), "artist": ", ".join(artist["name"] for artist in album["artists"]),
@@ -684,21 +701,22 @@ class SRUtil:
except Exception as e: except Exception as e:
# Exponential backoff with jitter for 429 or other errors # Exponential backoff with jitter for 429 or other errors
delay = self.RETRY_DELAY * (2 ** (attempt - 1)) + random.uniform(0, 0.5) delay = self.RETRY_DELAY * (2 ** (attempt - 1)) + random.uniform(0, 0.5)
logging.warning(
"Metadata fetch failed for track %s (attempt %d/%d): %s. Retrying in %.2fs",
track_id,
attempt,
self.MAX_METADATA_RETRIES,
str(e),
delay,
)
if attempt < self.MAX_METADATA_RETRIES: if attempt < self.MAX_METADATA_RETRIES:
logging.warning(
"Retrying metadata fetch for track %s (attempt %d/%d): %s. Next retry in %.2fs",
track_id,
attempt,
self.MAX_METADATA_RETRIES,
str(e),
delay,
)
await asyncio.sleep(delay) await asyncio.sleep(delay)
else: else:
logging.error( logging.error(
"Metadata fetch failed permanently for track %s after %d attempts", "Metadata fetch failed permanently for track %s after %d attempts: %s",
track_id, track_id,
self.MAX_METADATA_RETRIES, self.MAX_METADATA_RETRIES,
str(e),
) )
# Raise a specific exception so callers can react (e.g. notify) # Raise a specific exception so callers can react (e.g. notify)
raise MetadataFetchError( raise MetadataFetchError(
@@ -788,7 +806,7 @@ class SRUtil:
tracks_with_diff.sort(key=lambda x: x[1]) tracks_with_diff.sort(key=lambda x: x[1])
best_track, min_diff = tracks_with_diff[0] best_track, min_diff = tracks_with_diff[0]
logging.info(f"SR: Best match duration diff: {min_diff}s") logging.info(f"SR: Best match duration diff: {min_diff}s")
# If the closest match is more than 5 seconds off, consider no match # If the closest match is more than 10 seconds off, consider no match
if min_diff > 10: if min_diff > 10:
logging.info("SR: Duration diff too large, no match") logging.info("SR: Duration diff too large, no match")
return None return None