Compare commits

..

18 Commits

Author SHA1 Message Date
6240888ac5 cull transcriptions endpoints from README.md 2025-12-03 13:56:28 -05:00
be9ed777a5 retire transcriptions endpoints 2025-12-03 13:55:52 -05:00
7779049c93 misc / sr_wrapper-- only consider an album returned from tidal to be a duplicate if both the name AND release date match. 2025-11-26 15:09:43 -05:00
41d9065c79 rm 2025-11-25 13:08:20 -05:00
85298b861d minor 2025-11-25 13:06:07 -05:00
476c4e6e51 bugfix: LRCLIb's models.py needs load_dotenv 2025-11-23 07:12:38 -05:00
353f14c899 formatting / minor 2025-11-22 21:43:48 -05:00
3d0b867427 Misc 2025-11-22 21:41:12 -05:00
dcc6c7b24e More progress re: #34
- Change of direction, LRCLib searches from /lyric/search now use internal cache - which is a PGSQL import of the LRCLib SQLite database.  Change to PGSQL was made for performance.
2025-11-22 13:13:03 -05:00
c302b256d3 Begin #34 2025-11-21 12:29:12 -05:00
c6d2bad79d Enhance lyric search functionality by improving line splitting logic and adding multi-line matching for subsearch. Update cache handling to ensure confidence threshold is respected before returning results. 2025-10-24 13:40:55 -04:00
25d1ab226e Enhance Cync authentication flow with improved token management and 2FA handling. Add periodic token validation and logging for better debugging. Introduce FLAC stream check in bulk download process. 2025-10-22 06:55:37 -04:00
3f66223328 Fix import statement for index_definition in redis_cache.py and radio_util.py (dependency upgrade related-- redis module) 2025-10-15 12:13:03 -04:00
c493f2aabf Increase rate limit for lighting state requests and enhance error handling for Cync device operations. Improve lyric search processing by splitting lyrics based on line breaks and cleaning special characters (bugfix for subsearch/seek). 2025-10-15 10:10:56 -04:00
0029a9ec19 . 2025-10-07 12:07:45 -04:00
90c3efbb8b Remove .gitignore file, radio database restructuring 2025-10-07 12:07:13 -04:00
a61970d298 . 2025-10-03 11:34:48 -04:00
fb94750b46 - Removed unnecessary/unrelated files
- Added Scalar
2025-10-03 11:34:33 -04:00
21 changed files with 1183 additions and 2488 deletions

36
.gitignore vendored
View File

@@ -1,36 +0,0 @@
**/__pycache__/*
**/.vscode/*
**/private.py
**/solibs/*
**/*.json
**/mgr/*
constants.py
tests.py
db_migrate.py
notifier.py
test_hifi.py
youtube*
playlist_creator.py
artist_genre_tag.py
pg_migrate_lyrics.py
uv.lock
pyproject.toml
mypy.ini
.python-version
get_next_track.py
endpoints/radio.py
utils/radio_util.py
redis_playlist.py
endpoints/auth.py
endpoints/radio2
endpoints/radio2/**
hash_password.py
up.py
job_review.py
check_missing.py
**/auth/*
**/radio_api/*
test/db_stats.py
test/report/*
.gitignore
.env

View File

@@ -4,7 +4,10 @@ A modern FastAPI-based backend providing various endpoints for media, authentica
## Overview
This server is built with [FastAPI](https://fastapi.tiangolo.com/) and provides a comprehensive API for multiple services. The interactive API documentation is available at the [Swagger UI](https://api.codey.lol/docs).
This server is built with [FastAPI](https://fastapi.tiangolo.com/) and provides a comprehensive API for multiple services. API documentation is available in three formats:
- **Swagger UI**: [https://api.codey.lol/docs](https://api.codey.lol/docs) - Classic interactive API explorer with "Try it out" functionality
- **Scalar**: [https://api.codey.lol/scalar](https://api.codey.lol/scalar) - Modern, fast interactive API documentation (recommended)
- **ReDoc**: [https://api.codey.lol/redoc](https://api.codey.lol/redoc) - Clean, read-only documentation with better visual design
## API Endpoints
@@ -48,10 +51,6 @@ This server is built with [FastAPI](https://fastapi.tiangolo.com/) and provides
**YouTube (`/yt/`)** 🎥
- `POST /yt/search` - Search for YouTube videos by title
**Transcriptions (`/transcriptions/`)** 📝
- `POST /transcriptions/get_episodes` - Get episode list by show ID
- `POST /transcriptions/get_episode_lines` - Get transcript lines for specific episodes
**Lyric Search (`/lyric/`)** 🎵
- `POST /lyric/search` - Search for song lyrics across multiple sources
- Supports artist/song search, text search within lyrics

21
base.py
View File

@@ -7,6 +7,7 @@ import asyncio
from typing import Any
from fastapi import FastAPI, Request
from fastapi.middleware.cors import CORSMiddleware
from scalar_fastapi import get_scalar_api_reference
from lyric_search.sources import redis_cache
logging.basicConfig(level=logging.INFO)
@@ -21,6 +22,8 @@ app = FastAPI(
contact={"name": "codey"},
redirect_slashes=False,
loop=loop,
docs_url="/docs", # Swagger UI (default)
redoc_url="/redoc", # ReDoc UI (default, but explicitly set)
)
constants = importlib.import_module("constants").Constants()
@@ -33,6 +36,7 @@ origins = [
"https://status.boatson.boats",
"https://_new.codey.lol",
"http://localhost:4321",
"https://local.codey.lol:4321",
]
app.add_middleware(
@@ -44,6 +48,12 @@ app.add_middleware(
) # type: ignore
# Add Scalar API documentation endpoint (before blacklist routes)
@app.get("/scalar", include_in_schema=False)
def scalar_docs():
return get_scalar_api_reference(openapi_url="/openapi.json", title="codey.lol API")
"""
Blacklisted routes
"""
@@ -62,10 +72,15 @@ def base_head():
@app.get("/{path}", include_in_schema=False)
def disallow_get_any(request: Request, var: Any = None):
path = request.path_params["path"]
allowed_paths = ["widget", "misc/no", "docs", "redoc", "scalar", "openapi.json"]
logging.info(
f"Checking path: {path}, allowed: {path in allowed_paths or path.split('/', maxsplit=1)[0] in allowed_paths}"
)
if not (
isinstance(path, str)
and (path.split("/", maxsplit=1) == "widget" or path == "misc/no")
and (path.split("/", maxsplit=1)[0] in allowed_paths or path in allowed_paths)
):
logging.error(f"BLOCKED path: {path}")
return util.get_blocked_response()
else:
logging.info("OK, %s", path)
@@ -88,9 +103,6 @@ routes: dict = {
"randmsg": importlib.import_module("endpoints.rand_msg").RandMsg(
app, util, constants
),
"transcriptions": importlib.import_module(
"endpoints.transcriptions"
).Transcriptions(app, util, constants),
"lyrics": importlib.import_module("endpoints.lyric_search").LyricSearch(
app, util, constants
),
@@ -117,7 +129,6 @@ if radio_endpoint:
End Actionable Routes
"""
"""
Startup
"""

View File

@@ -3,6 +3,9 @@ import json
import os
import time
import aiohttp
import asyncio
import traceback
from datetime import datetime
from fastapi import FastAPI, Depends, HTTPException, Request
from fastapi_throttle import RateLimiter
from fastapi.responses import JSONResponse
@@ -14,10 +17,75 @@ from pycync.user import User # type: ignore
from pycync.cync import Cync as Cync # type: ignore
from pycync import Auth # type: ignore
from pycync.exceptions import TwoFactorRequiredError, AuthFailedError # type: ignore
import inspect
import getpass
from typing import Optional
# Configure logging to write to a file for specific events
logging.basicConfig(
filename="cync_auth_events.log",
level=logging.INFO,
format="%(asctime)s - %(levelname)s - %(message)s",
)
def _mask_token(token: Optional[str]) -> str:
"""Mask sensitive token data for logging, showing only first/last 4 chars."""
if not token or len(token) < 8:
return "<invalid_token>"
return f"{token[:4]}...{token[-4:]}"
def _log_token_state(user, context: str):
"""Log masked token state for debugging."""
if not user:
logging.info(f"{context} - No user object available")
return
try:
logging.info(
f"{context} - Token state: access=%s refresh=%s expires_at=%s",
_mask_token(getattr(user, "access_token", None)),
_mask_token(getattr(user, "refresh_token", None)),
getattr(user, "expires_at", None),
)
except Exception as e:
logging.error(f"Error logging token state: {e}")
class Lighting(FastAPI):
async def ensure_cync_connection(self):
async def _close_session_safely(self):
"""Safely close the current session if it exists."""
if self.session and not getattr(self.session, "closed", True):
try:
await self.session.close()
# Wait a bit for the underlying connections to close
await asyncio.sleep(0.1)
except Exception as e:
logging.warning(f"Error closing session: {e}")
self.session = None
self.auth = None
self.cync_api = None
async def _test_connection_health(self) -> bool:
"""Test if the current connection is healthy by making a simple API call."""
if (
not self.cync_api
or not self.session
or getattr(self.session, "closed", True)
):
return False
try:
# Make a simple API call to test connectivity
devices = self.cync_api.get_devices()
# Just check if we get a response without errors
return devices is not None
except Exception as e:
logging.warning(f"Connection health check failed: {e}")
return False
async def ensure_cync_connection(self, force_reconnect: bool = False):
"""Ensure aiohttp session and Cync API are alive, re-create if needed."""
# Check required environment variables
missing_vars = []
@@ -31,58 +99,168 @@ class Lighting(FastAPI):
raise Exception(
f"Missing required environment variables: {', '.join(missing_vars)}"
)
# Cast to str after check to silence linter
cync_email: str = self.cync_email # type: ignore
cync_password: str = self.cync_password # type: ignore
# Check if session is closed or missing
if not self.session or getattr(self.session, "closed", False):
self.session = aiohttp.ClientSession()
# If force_reconnect is True or connection is unhealthy, rebuild everything
if force_reconnect or not await self._test_connection_health():
logging.info(
"Connection unhealthy or force reconnect requested. Rebuilding connection..."
)
# Clean up existing connection
await self._close_session_safely()
# Create new session with timeout configuration
timeout = aiohttp.ClientTimeout(total=30, connect=10)
connector = aiohttp.TCPConnector(
limit=100,
limit_per_host=30,
ttl_dns_cache=300,
use_dns_cache=True,
keepalive_timeout=60,
enable_cleanup_closed=True,
)
self.session = aiohttp.ClientSession(timeout=timeout, connector=connector)
# Load cached token and check validity
self.cync_user = None
cached_user = self._load_cached_user()
if cached_user:
token_status = None
if cached_user and hasattr(cached_user, "expires_at"):
# Add buffer time - consider token expired if less than 5 minutes remaining
buffer_time = 300 # 5 minutes
if cached_user.expires_at > (time.time() + buffer_time):
token_status = "valid"
else:
token_status = "expired"
else:
token_status = "no cached user or missing expires_at"
logging.info(f"Cync token status: {token_status}")
if token_status == "valid" and cached_user is not None:
# Use cached token
self.auth = Auth(
session=self.session,
user=cached_user,
username=cync_email,
password=cync_password,
)
self.cync_user = cached_user
logging.info("Reusing valid cached token, no 2FA required.")
else:
# Need fresh login - clear any cached user that's expired
if token_status == "expired":
try:
os.remove(self.token_cache_path)
logging.info("Removed expired token cache")
except (OSError, FileNotFoundError):
pass
logging.info("Initializing new Auth instance...")
self.auth = Auth(
session=self.session, username=cync_email, password=cync_password
session=self.session,
username=cync_email,
password=cync_password,
)
# Try to refresh token
self.cync_user = None
if (
self.auth.user
and hasattr(self.auth.user, "expires_at")
and self.auth.user.expires_at > time.time()
):
try:
await self.auth.async_refresh_user_token()
self.cync_user = self.auth.user
self._save_cached_user(self.cync_user)
except AuthFailedError:
pass
# If no valid token, login
if not self.cync_user:
try:
logging.info("Attempting fresh login...")
self.cync_user = await self.auth.login()
_log_token_state(self.cync_user, "After fresh login")
self._save_cached_user(self.cync_user)
logging.info("Fresh login successful")
except TwoFactorRequiredError:
logging.error(
"Cync 2FA required. Set CYNC_2FA_CODE in env if needed."
)
raise Exception("Cync 2FA required.")
twofa_code = os.getenv("CYNC_2FA_CODE")
if not twofa_code:
print("Cync 2FA required. Please enter your code:")
twofa_code = getpass.getpass("2FA Code: ")
if twofa_code:
logging.info("Retrying Cync login with 2FA code.")
try:
self.cync_user = await self.auth.login(
two_factor_code=twofa_code
)
self._save_cached_user(self.cync_user)
logging.info("Logged in with 2FA successfully.")
except Exception as e:
logging.error("Cync 2FA login failed: %s", e)
logging.info(
"2FA failure details: Code=%s, User=%s",
twofa_code,
self.cync_user,
)
raise Exception("Cync 2FA code invalid or not accepted.")
else:
logging.error("Cync 2FA required but no code provided.")
raise Exception("Cync 2FA required.")
except AuthFailedError as e:
logging.error("Failed to authenticate with Cync API: %s", e)
raise Exception("Cync authentication failed.")
self.cync_api = await Cync.create(self.auth)
# Also check if cync_api is None (shouldn't happen, but just in case)
if not self.cync_api:
if not self.auth:
logging.critical("self.auth: %s", self.auth)
return
self.cync_api = await Cync.create(self.auth)
# Create new Cync API instance
try:
logging.info("Creating Cync API instance...")
_log_token_state(self.auth.user, "Before Cync.create")
self.cync_api = await Cync.create(self.auth)
logging.info("Cync API connection established successfully")
except Exception as e:
logging.error("Failed to create Cync API instance")
logging.error("Exception details: %s", str(e))
logging.error("Traceback:\n%s", traceback.format_exc())
# Save diagnostic info
diagnostic_data = {
"timestamp": datetime.now().isoformat(),
"error_type": type(e).__name__,
"error_message": str(e),
"auth_state": {
"has_auth": bool(self.auth),
"has_user": bool(getattr(self.auth, "user", None)),
"user_state": {
"access_token": _mask_token(
getattr(self.auth.user, "access_token", None)
)
if self.auth and self.auth.user
else None,
"refresh_token": _mask_token(
getattr(self.auth.user, "refresh_token", None)
)
if self.auth and self.auth.user
else None,
"expires_at": getattr(self.auth.user, "expires_at", None)
if self.auth and self.auth.user
else None,
}
if self.auth and self.auth.user
else None,
},
}
diagnostic_file = f"cync_api_failure-{int(time.time())}.json"
try:
with open(diagnostic_file, "w") as f:
json.dump(diagnostic_data, f, indent=2)
logging.info(
f"Saved API creation diagnostic data to {diagnostic_file}"
)
except Exception as save_error:
logging.error(f"Failed to save diagnostic data: {save_error}")
raise
# Final validation
if (
not self.cync_api
or not self.session
or getattr(self.session, "closed", True)
):
logging.error("Connection validation failed after setup")
_log_token_state(
getattr(self.auth, "user", None), "Failed connection validation"
)
raise Exception("Failed to establish proper Cync connection")
"""
Lighting Endpoints
@@ -108,6 +286,7 @@ class Lighting(FastAPI):
self.auth = None
self.cync_user = None
self.cync_api = None
self.health_check_task: Optional[asyncio.Task] = None
# Set up Cync connection at startup using FastAPI event
@app.on_event("startup")
@@ -125,76 +304,193 @@ class Lighting(FastAPI):
f"Missing required environment variables: {', '.join(missing_vars)}"
)
self.session = aiohttp.ClientSession()
cached_user = self._load_cached_user()
if cached_user:
self.auth = Auth(
session=self.session,
user=cached_user,
username=self.cync_email or "",
password=self.cync_password or "",
)
else:
self.auth = Auth(
session=self.session,
username=self.cync_email or "",
password=self.cync_password or "",
)
# Try to refresh token
if (
self.auth.user
and hasattr(self.auth.user, "expires_at")
and self.auth.user.expires_at > time.time()
):
try:
await self.auth.async_refresh_user_token()
self.cync_user = self.auth.user
self._save_cached_user(self.cync_user)
except AuthFailedError:
pass
# If no valid token, login
if not self.cync_user:
try:
self.cync_user = await self.auth.login()
self._save_cached_user(self.cync_user)
except TwoFactorRequiredError:
logging.error(
"Cync 2FA required. Set CYNC_2FA_CODE in env if needed."
)
raise Exception("Cync 2FA required.")
except AuthFailedError as e:
logging.error("Failed to authenticate with Cync API: %s", e)
raise Exception("Cync authentication failed.")
# Create persistent Cync API object
self.cync_api = await Cync.create(self.auth)
# Use ensure_cync_connection which has proper token caching
try:
await self.ensure_cync_connection()
logging.info("Cync lighting system initialized successfully")
except Exception as e:
logging.error(f"Failed to initialize Cync connection at startup: {e}")
# Don't raise - allow server to start, connection will be retried on first request
# Schedule periodic token validation and connection health checks
self.health_check_task = asyncio.create_task(self._schedule_health_checks())
@app.on_event("shutdown")
async def shutdown_event():
# Cancel health check task
if self.health_check_task and not self.health_check_task.done():
self.health_check_task.cancel()
try:
await self.health_check_task
except asyncio.CancelledError:
logging.info("Health check task cancelled successfully")
pass
# Clean up connections
await self._close_session_safely()
logging.info("Cync lighting system shut down cleanly")
# Register endpoints
self.endpoints: dict = {
"lighting/state": self.get_lighting_state,
}
for endpoint, handler in self.endpoints.items():
app.add_api_route(
self.app.add_api_route(
f"/{endpoint}",
handler,
methods=["GET"],
include_in_schema=True,
dependencies=[
Depends(RateLimiter(times=10, seconds=2)),
Depends(RateLimiter(times=25, seconds=2)),
Depends(get_current_user),
],
)
app.add_api_route(
self.app.add_api_route(
"/lighting/state",
self.set_lighting_state,
methods=["POST"],
include_in_schema=True,
dependencies=[
Depends(RateLimiter(times=10, seconds=2)),
Depends(RateLimiter(times=25, seconds=2)),
Depends(get_current_user),
],
)
async def _refresh_or_login(self):
if not self.auth:
logging.error("Auth object is not initialized.")
raise Exception("Cync authentication not initialized.")
try:
user = getattr(self.auth, "user", None)
_log_token_state(user, "Before refresh attempt")
if user and hasattr(user, "expires_at") and user.expires_at > time.time():
refresh = getattr(self.auth, "async_refresh_user_token", None)
if callable(refresh):
try:
logging.info("Attempting token refresh...")
result = refresh()
if inspect.isawaitable(result):
await result
logging.info(
"Token refresh completed successfully (awaited)"
)
else:
logging.info("Token refresh completed (non-awaitable)")
except AuthFailedError as e:
logging.error("Token refresh failed with AuthFailedError")
logging.error("Exception details: %s", str(e))
logging.error("Traceback:\n%s", traceback.format_exc())
# Save diagnostic info to file
diagnostic_data = {
"timestamp": datetime.now().isoformat(),
"error_type": "AuthFailedError",
"error_message": str(e),
"user_state": {
"access_token": _mask_token(
getattr(user, "access_token", None)
),
"refresh_token": _mask_token(
getattr(user, "refresh_token", None)
),
"expires_at": getattr(user, "expires_at", None),
},
}
try:
diagnostic_file = (
f"cync_auth_failure-{int(time.time())}.json"
)
with open(diagnostic_file, "w") as f:
json.dump(diagnostic_data, f, indent=2)
logging.info(f"Saved diagnostic data to {diagnostic_file}")
except Exception as save_error:
logging.error(
f"Failed to save diagnostic data: {save_error}"
)
raise
login = getattr(self.auth, "login", None)
if callable(login):
try:
result = login()
if inspect.isawaitable(result):
self.cync_user = await result
else:
self.cync_user = result
self._save_cached_user(self.cync_user)
logging.info("Logged in successfully.")
except TwoFactorRequiredError:
twofa_code = os.getenv("CYNC_2FA_CODE")
if not twofa_code:
# Prompt interactively if not set
print("Cync 2FA required. Please enter your code:")
twofa_code = getpass.getpass("2FA Code: ")
if twofa_code:
logging.info("Retrying Cync login with 2FA code.")
try:
result = login(two_factor_code=twofa_code)
if inspect.isawaitable(result):
self.cync_user = await result
else:
self.cync_user = result
self._save_cached_user(self.cync_user)
logging.info("Logged in with 2FA successfully.")
except Exception as e:
logging.error("Cync 2FA login failed: %s", e)
logging.info(
"2FA failure details: Code=%s, User=%s",
twofa_code,
self.cync_user,
)
raise Exception("Cync 2FA code invalid or not accepted.")
else:
logging.error("Cync 2FA required but no code provided.")
raise Exception("Cync 2FA required.")
else:
raise Exception("Auth object missing login method.")
except AuthFailedError as e:
logging.error("Failed to authenticate with Cync API: %s", e)
raise Exception("Cync authentication failed.")
except Exception as e:
logging.error("Unexpected error during authentication: %s", e)
raise
async def _schedule_health_checks(self):
"""Periodic health checks and token validation."""
while True:
try:
await asyncio.sleep(300) # Check every 5 minutes
# Check token expiration (refresh if less than 10 minutes left)
if self.cync_user and hasattr(self.cync_user, "expires_at"):
expires_at = getattr(self.cync_user, "expires_at", 0)
time_until_expiry = expires_at - time.time()
if time_until_expiry < 600: # Less than 10 minutes
logging.info(
f"Token expires in {int(time_until_expiry / 60)} minutes. Refreshing..."
)
try:
await self._refresh_or_login()
except Exception as e:
logging.error(
f"Token refresh failed during health check: {e}"
)
# Test connection health
if not await self._test_connection_health():
logging.warning(
"Connection health check failed. Will reconnect on next API call."
)
except asyncio.CancelledError:
logging.info("Health check task cancelled")
break
except Exception as e:
logging.error(f"Error during periodic health check: {e}")
# Continue the loop even on errors
def _load_cached_user(self):
try:
if os.path.exists(self.token_cache_path):
@@ -253,8 +549,10 @@ class Lighting(FastAPI):
"""
Set the lighting state and apply it to the Cync device.
"""
logging.info("=== LIGHTING STATE REQUEST RECEIVED ===")
try:
state = await request.json()
logging.info(f"Requested state: {state}")
# Validate state (basic validation)
if not isinstance(state, dict):
raise HTTPException(
@@ -266,7 +564,7 @@ class Lighting(FastAPI):
await self.ensure_cync_connection()
# Apply to Cync device
# Validate and extract state values
power = state.get("power", "off")
if power not in ["on", "off"]:
raise HTTPException(
@@ -296,41 +594,134 @@ class Lighting(FastAPI):
else:
rgb = None
# Use persistent Cync API object
if not self.cync_api:
raise HTTPException(status_code=500, detail="Cync API not initialized.")
devices = self.cync_api.get_devices()
if not devices or not isinstance(devices, (list, tuple)):
raise HTTPException(
status_code=500, detail="No devices returned from Cync API."
)
light = next(
(
d
for d in devices
if hasattr(d, "name") and d.name == self.cync_device_name
),
None,
# Apply to Cync device with robust retry and error handling
max_retries = 3
last_exception: Exception = Exception("No attempts made")
for attempt in range(max_retries):
try:
# Ensure connection before each attempt
force_reconnect = attempt > 0 # Force reconnect on retries
await self.ensure_cync_connection(force_reconnect=force_reconnect)
if not self.cync_api:
raise Exception("Cync API not available after connection setup")
logging.info(
f"Attempt {attempt + 1}/{max_retries}: Getting devices from Cync API..."
)
devices = self.cync_api.get_devices()
if not devices:
raise Exception("No devices returned from Cync API")
logging.info(
f"Devices returned: {[getattr(d, 'name', 'unnamed') for d in devices]}"
)
light = next(
(
d
for d in devices
if hasattr(d, "name") and d.name == self.cync_device_name
),
None,
)
if not light:
available_devices = [
getattr(d, "name", "unnamed") for d in devices
]
raise Exception(
f"Device '{self.cync_device_name}' not found. Available devices: {available_devices}"
)
logging.info(
f"Selected device: {getattr(light, 'name', 'unnamed')}"
)
# Execute device operations
operations_completed = []
# Set power
if power == "on":
result = await light.turn_on()
operations_completed.append(f"turn_on: {result}")
else:
result = await light.turn_off()
operations_completed.append(f"turn_off: {result}")
# Set brightness
if "brightness" in state:
result = await light.set_brightness(brightness)
operations_completed.append(
f"set_brightness({brightness}): {result}"
)
# Set color
if rgb:
result = await light.set_rgb(rgb)
operations_completed.append(f"set_rgb({rgb}): {result}")
logging.info(
f"All operations completed successfully: {operations_completed}"
)
break # Success, exit retry loop
except (
aiohttp.ClientConnectionError,
aiohttp.ClientOSError,
aiohttp.ServerDisconnectedError,
aiohttp.ClientConnectorError,
ConnectionResetError,
ConnectionError,
OSError,
asyncio.TimeoutError,
) as e:
last_exception = e
logging.warning(
f"Connection/network error (attempt {attempt + 1}/{max_retries}): {type(e).__name__}: {e}"
)
if attempt < max_retries - 1:
# Wait a bit before retry to allow network/server recovery
await asyncio.sleep(
2**attempt
) # Exponential backoff: 1s, 2s, 4s
continue
except (AuthFailedError, TwoFactorRequiredError) as e:
last_exception = e
logging.error(
f"Authentication error (attempt {attempt + 1}/{max_retries}): {e}"
)
if attempt < max_retries - 1:
# Clear cached tokens on auth errors
try:
os.remove(self.token_cache_path)
logging.info("Cleared token cache due to auth error")
except (OSError, FileNotFoundError):
pass
await asyncio.sleep(1)
continue
except Exception as e:
last_exception = e
error_msg = f"Unexpected error (attempt {attempt + 1}/{max_retries}): {type(e).__name__}: {e}"
logging.error(error_msg)
# On unexpected errors, try reconnecting for next attempt
if attempt < max_retries - 1:
logging.warning(
"Forcing full reconnection due to unexpected error..."
)
await asyncio.sleep(1)
continue
# If we get here, all retries failed
logging.error(
f"All {max_retries} attempts failed. Last error: {type(last_exception).__name__}: {last_exception}"
)
if not light:
raise HTTPException(
status_code=404,
detail=f"Device '{self.cync_device_name}' not found",
)
# Set power
if power == "on":
await light.turn_on()
else:
await light.turn_off()
# Set brightness
if "brightness" in state:
await light.set_brightness(brightness)
# Set color
if rgb:
await light.set_rgb(rgb)
raise last_exception
logging.info(
"Successfully applied state to device '%s': %s",

View File

@@ -1,4 +1,3 @@
import logging
import os
import urllib.parse
import regex
@@ -81,9 +80,9 @@ class LyricSearch(FastAPI):
)
for endpoint, handler in self.endpoints.items():
times: int = 20
times: int = 5
seconds: int = 2
rate_limit: tuple[int, int] = (2, 3) # Default; (Times, Seconds)
rate_limit: tuple[int, int] = (times, seconds) # Default; (Times, Seconds)
_schema_include = endpoint in ["lyric/search"]
if (
@@ -187,7 +186,7 @@ class LyricSearch(FastAPI):
},
)
excluded_sources: Optional[list] = data.excluded_sources
excluded_sources: list = data.excluded_sources or []
aggregate_search = aggregate.Aggregate(exclude_methods=excluded_sources)
plain_lyrics: bool = not data.lrc
result: Optional[Union[LyricsResult, dict]] = await aggregate_search.search(
@@ -210,29 +209,93 @@ class LyricSearch(FastAPI):
if data.sub and not data.lrc:
seeked_found_line: Optional[int] = None
lyric_lines: list[str] = result["lyrics"].strip().split(" / ")
# Split lyrics into lines based on <br>, newline characters, or " / "
lyrics_text = result["lyrics"].strip()
# Determine the delimiter and split accordingly
if "<br>" in lyrics_text:
lyric_lines = lyrics_text.split("<br>")
separator = "<br>"
elif " / " in lyrics_text:
lyric_lines = lyrics_text.split(" / ")
separator = " / "
else:
lyric_lines = lyrics_text.split("\n")
separator = "\n"
search_term = data.sub.strip().lower()
# First try single-line matching (existing behavior)
for i, line in enumerate(lyric_lines):
line = regex.sub(r"\u2064", "", line.strip())
if data.sub.strip().lower() in line.strip().lower():
# Remove any special characters and extra spaces
cleaned_line = regex.sub(r"\u2064", "", line.strip())
if search_term in cleaned_line.lower():
seeked_found_line = i
logging.debug(
"Found %s at %s, match for %s!",
line,
seeked_found_line,
data.sub,
) # REMOVEME: DEBUG
break
if not seeked_found_line:
# If no single-line match found, try multi-line matching
if seeked_found_line is None:
# Try matching across consecutive lines (up to 5 lines for reasonable performance)
max_lines_to_check = min(5, len(lyric_lines))
for i in range(len(lyric_lines)):
for line_count in range(2, max_lines_to_check + 1):
if i + line_count <= len(lyric_lines):
# Combine consecutive lines with space separator
combined_lines = []
line_positions: list[
tuple[int, int]
] = [] # Track where each line starts in combined text
combined_text_parts: list[str] = []
for j in range(line_count):
if i + j < len(lyric_lines):
cleaned_line = regex.sub(
r"\u2064", "", lyric_lines[i + j].strip()
)
combined_lines.append(cleaned_line)
# Track position of this line in the combined text
line_start_pos = len(
" ".join(combined_text_parts).lower()
)
if line_start_pos > 0:
line_start_pos += (
1 # Account for space separator
)
line_positions.append((i + j, line_start_pos))
combined_text_parts.append(cleaned_line)
combined_text = " ".join(combined_lines).lower()
if search_term in combined_text:
# Find which specific line the match starts in
match_pos = combined_text.find(search_term)
# Find the line that contains the start of the match
actual_start_line = i # Default fallback
for line_idx, line_start_pos in line_positions:
if line_start_pos <= match_pos:
actual_start_line = line_idx
else:
break
seeked_found_line = actual_start_line
break
if seeked_found_line is not None:
break
if seeked_found_line is None:
return JSONResponse(
status_code=500,
content={
"err": True,
"errorText": "Seek (a.k.a. subsearch) failed.",
"failed_seek": True,
},
)
result["lyrics"] = " / ".join(lyric_lines[seeked_found_line:])
# Only include lines strictly starting from the matched line
result["lyrics"] = separator.join(lyric_lines[seeked_found_line:])
result["confidence"] = int(result["confidence"])
result["time"] = f"{float(result['time']):.4f}"

View File

@@ -307,7 +307,7 @@ class Radio(FastAPI):
}
)
async def album_art_handler(
def album_art_handler(
self, request: Request, track_id: Optional[int] = None,
station: Station = "main"
) -> Response:
@@ -364,13 +364,21 @@ class Radio(FastAPI):
ret_obj: dict = {**self.radio_util.now_playing[station]}
ret_obj["station"] = station
try:
ret_obj["elapsed"] = int(time.time()) - ret_obj["start"]
ret_obj["elapsed"] = int(time.time()) - ret_obj["start"] if ret_obj["start"] else 0
except KeyError:
traceback.print_exc()
ret_obj["elapsed"] = 0
ret_obj.pop("file_path")
return JSONResponse(content=ret_obj)
def _bg_cache_art(self, track_id: int, file_path: str):
try:
album_art = self.radio_util.get_album_art(track_id=track_id)
if not album_art:
self.radio_util.cache_album_art(track_id, file_path)
except Exception as e:
logging.error(f"Background art cache error: {e}")
async def radio_get_next(
self,
data: ValidRadioNextRequest,
@@ -448,13 +456,7 @@ class Radio(FastAPI):
logging.info("radio_get_next Exception: %s", str(e))
traceback.print_exc()
try:
album_art = self.radio_util.get_album_art(track_id=next["id"])
if not album_art:
self.radio_util.cache_album_art(next["id"], next["file_path"])
except Exception as e:
logging.info("radio_get_next Exception: %s", str(e))
traceback.print_exc()
background_tasks.add_task(self._bg_cache_art, next["id"], next["file_path"])
return JSONResponse(content=next)
@@ -496,8 +498,12 @@ class Radio(FastAPI):
},
)
search: bool = self.radio_util.search_db(
artistsong=artistsong, artist=artist, song=song, station=data.station
loop = asyncio.get_running_loop()
search: bool = await loop.run_in_executor(
None,
lambda: self.radio_util.search_db(
artistsong=artistsong, artist=artist, song=song, station=data.station
)
)
if data.alsoSkip:
await self.radio_util._ls_skip(data.station)
@@ -764,9 +770,24 @@ class Radio(FastAPI):
logging.info(f"[LRC] Starting fetch for {station}: {artist} - {title}")
# Try SR first with timeout
# Try LRCLib first with timeout
try:
async with asyncio.timeout(5.0): # 5 second timeout
async with asyncio.timeout(10.0): # 10 second timeout
logging.info("[LRC] Trying LRCLib")
lrclib_result = await self.lrclib.search(artist, title, plain=False, raw=True)
if lrclib_result and lrclib_result.lyrics and isinstance(lrclib_result.lyrics, str):
logging.info("[LRC] Found from LRCLib")
return lrclib_result.lyrics, "LRCLib"
except asyncio.TimeoutError:
logging.warning("[LRC] LRCLib fetch timed out")
except Exception as e:
logging.error(f"[LRC] LRCLib fetch error: {e}")
logging.info("[LRC] LRCLib fetch completed without results")
# Try SR as fallback with timeout
try:
async with asyncio.timeout(10.0): # 10 second timeout
lrc = await self.sr_util.get_lrc_by_artist_song(
artist, title, duration=duration
)
@@ -778,21 +799,6 @@ class Radio(FastAPI):
except Exception as e:
logging.error(f"[LRC] SR fetch error: {e}")
logging.info("[LRC] SR fetch completed without results")
# Try LRCLib as fallback with timeout
try:
async with asyncio.timeout(5.0): # 5 second timeout
logging.info("[LRC] Trying LRCLib fallback")
lrclib_result = await self.lrclib.search(artist, title, plain=False)
if lrclib_result and lrclib_result.lyrics and isinstance(lrclib_result.lyrics, str):
logging.info("[LRC] Found from LRCLib")
return lrclib_result.lyrics, "LRCLib"
except asyncio.TimeoutError:
logging.warning("[LRC] LRCLib fetch timed out")
except Exception as e:
logging.error(f"[LRC] LRCLib fetch error: {e}")
logging.info("[LRC] No lyrics found from any source")
return None, "None"
except Exception as e:
@@ -804,11 +810,21 @@ class Radio(FastAPI):
try:
async with self.lrc_cache_locks[station]:
self.lrc_cache.pop(station, None)
lrc, source = await self._fetch_and_cache_lrc(station, track_json)
if lrc:
self.lrc_cache[station] = lrc
lrc, source = await self._fetch_and_cache_lrc(station, track_json)
async with self.lrc_cache_locks[station]:
# Verify we are still on the same song
current_track = self.radio_util.now_playing.get(station)
if current_track and current_track.get("uuid") == track_json.get("uuid"):
if lrc:
self.lrc_cache[station] = lrc
else:
self.lrc_cache[station] = None
else:
self.lrc_cache[station] = None
logging.info(f"[LRC] Discarding fetch result for {station} as track changed.")
return
if lrc:
await self.broadcast_lrc(station, lrc, source)
except Exception as e:

View File

@@ -20,6 +20,9 @@ from lyric_search.sources import private
from typing import Literal
from pydantic import BaseModel
logger = logging.getLogger()
logger.setLevel(logging.DEBUG)
class ValidBulkFetchRequest(BaseModel):
track_ids: list[int]

View File

@@ -1,171 +0,0 @@
import os
import aiosqlite as sqlite3
from fastapi import FastAPI, Depends, Response
from fastapi_throttle import RateLimiter
from fastapi.responses import JSONResponse
from typing import Optional, LiteralString, Union, Iterable, cast
from .constructors import ValidShowEpisodeLineRequest, ValidShowEpisodeListRequest
class Transcriptions(FastAPI):
"""
Transcription Endpoints
"""
def __init__(self, app: FastAPI, util, constants) -> None:
"""Initialize Transcriptions endpoints."""
self.app: FastAPI = app
self.util = util
self.constants = constants
self.endpoints: dict = {
"transcriptions/get_episodes": self.get_episodes_handler,
"transcriptions/get_episode_lines": self.get_episode_lines_handler,
# tbd
}
for endpoint, handler in self.endpoints.items():
app.add_api_route(
f"/{endpoint}",
handler,
methods=["POST"],
include_in_schema=True,
dependencies=[Depends(RateLimiter(times=2, seconds=2))],
)
async def get_episodes_handler(
self, data: ValidShowEpisodeListRequest
) -> JSONResponse:
"""
Get list of episodes by show ID.
Parameters:
- **data** (ValidShowEpisodeListRequest): Request containing show ID.
Returns:
- **JSONResponse**: Contains a list of episodes.
"""
show_id: int = data.s
db_path: Optional[Union[str, LiteralString]] = None
db_query: Optional[str] = None
show_title: Optional[str] = None
if not isinstance(show_id, int):
return JSONResponse(
status_code=500,
content={
"err": True,
"errorText": "Invalid request",
},
)
show_id = int(show_id)
if not (str(show_id).isnumeric()) or show_id not in [0, 1, 2]:
return JSONResponse(
status_code=500,
content={
"err": True,
"errorText": "Show not found.",
},
)
match show_id:
case 0:
db_path = os.path.join("/usr/local/share", "sqlite_dbs", "sp.db")
db_query = """SELECT DISTINCT(("S" || Season || "E" || Episode || " " || Title)), ID FROM SP_DAT ORDER BY Season, Episode"""
show_title = "South Park"
case 1:
db_path = os.path.join("/usr/local/share", "sqlite_dbs", "futur.db")
db_query = """SELECT DISTINCT(("S" || EP_S || "E" || EP_EP || " " || EP_TITLE)), EP_ID FROM clean_dialog ORDER BY EP_S, EP_EP"""
show_title = "Futurama"
case 2:
db_path = os.path.join("/usr/local/share", "sqlite_dbs", "parks.db")
db_query = """SELECT DISTINCT(("S" || EP_S || "E" || EP_EP || " " || EP_TITLE)), EP_ID FROM clean_dialog ORDER BY EP_S, EP_EP"""
show_title = "Parks And Rec"
case _:
return JSONResponse(
status_code=500,
content={
"err": True,
"errorText": "Unknown error.",
},
)
async with sqlite3.connect(database=db_path, timeout=1) as _db:
async with await _db.execute(db_query) as _cursor:
result: Iterable[sqlite3.Row] = await _cursor.fetchall()
return JSONResponse(
content={
"show_title": show_title,
"episodes": [
{
"id": item[1],
"ep_friendly": item[0],
}
for item in result
],
}
)
async def get_episode_lines_handler(
self, data: ValidShowEpisodeLineRequest
) -> Response:
"""
Get lines for a particular episode.
Parameters:
- **data** (ValidShowEpisodeLineRequest): Request containing show and episode ID.
Returns:
- **Response**: Episode lines.
"""
show_id: int = int(data.s)
episode_id: int = int(data.e)
match show_id:
case 0:
db_path: Union[str, LiteralString] = os.path.join(
"/usr/local/share", "sqlite_dbs", "sp.db"
)
db_query: str = """SELECT ("S" || Season || "E" || Episode || " " || Title), Character, Line FROM SP_DAT WHERE ID = ?"""
case 1:
db_path = os.path.join("/usr/local/share", "sqlite_dbs", "futur.db")
db_query = """SELECT ("S" || EP_S || "E" || EP_EP || " " || EP_TITLE || "<br><em>Opener: " || EP_OPENER || "</em>"), EP_LINE_SPEAKER, EP_LINE FROM clean_dialog WHERE EP_ID = ? ORDER BY LINE_ID ASC"""
case 2:
db_path = os.path.join("/usr/local/share", "sqlite_dbs", "parks.db")
db_query = """SELECT ("S" || EP_S || "E" || EP_EP || " " || EP_TITLE), EP_LINE_SPEAKER, EP_LINE FROM clean_dialog WHERE EP_ID = ? ORDER BY id ASC"""
case _:
return JSONResponse(
status_code=500,
content={
"err": True,
"errorText": "Unknown error",
},
)
async with sqlite3.connect(database=db_path, timeout=1) as _db:
params: tuple = (episode_id,)
async with await _db.execute(db_query, params) as _cursor:
result: Iterable[sqlite3.Row] = await _cursor.fetchall()
result_list = cast(list[sqlite3.Row], result)
if not result_list:
return Response(
status_code=404,
content="Not found",
)
first_result: sqlite3.Row = result_list[0]
return JSONResponse(
content={
"episode_id": episode_id,
"ep_friendly": first_result[0].strip(),
"lines": [
{
"speaker": item[1].strip(),
"line": item[2].strip(),
}
for item in result
],
}
)

112
lyric_search/models.py Normal file
View File

@@ -0,0 +1,112 @@
"""
Database models for LRCLib lyrics cache.
"""
import os
import urllib.parse
from typing import Type, AsyncGenerator
from sqlalchemy import (
Column,
Integer,
String,
Float,
Boolean,
DateTime,
ForeignKey,
UniqueConstraint,
)
from sqlalchemy.orm import relationship, foreign
from sqlalchemy.ext.declarative import declarative_base, DeclarativeMeta
from sqlalchemy.ext.asyncio import AsyncEngine, create_async_engine, AsyncSession
from sqlalchemy.ext.asyncio import async_sessionmaker
from dotenv import load_dotenv
load_dotenv()
Base: Type[DeclarativeMeta] = declarative_base()
class Tracks(Base): # type: ignore
"""Tracks table - stores track metadata."""
__tablename__ = "tracks"
id = Column(Integer, primary_key=True, autoincrement=True)
name = Column(String, index=True)
name_lower = Column(String, index=True)
artist_name = Column(String, index=True)
artist_name_lower = Column(String, index=True)
album_name = Column(String)
album_name_lower = Column(String, index=True)
duration = Column(Float, index=True)
last_lyrics_id = Column(Integer, ForeignKey("lyrics.id"), index=True)
created_at = Column(DateTime)
updated_at = Column(DateTime)
# Relationships
lyrics = relationship(
"Lyrics",
back_populates="track",
foreign_keys=[last_lyrics_id],
primaryjoin="Tracks.id == foreign(Lyrics.track_id)",
)
# Constraints
__table_args__ = (
UniqueConstraint(
"name_lower",
"artist_name_lower",
"album_name_lower",
"duration",
name="uq_tracks",
),
)
class Lyrics(Base): # type: ignore
"""Lyrics table - stores lyrics content."""
__tablename__ = "lyrics"
id = Column(Integer, primary_key=True, autoincrement=True)
plain_lyrics = Column(String)
synced_lyrics = Column(String)
track_id = Column(Integer, ForeignKey("tracks.id"), index=True)
has_plain_lyrics = Column(Boolean, index=True)
has_synced_lyrics = Column(Boolean, index=True)
instrumental = Column(Boolean)
source = Column(String, index=True)
created_at = Column(DateTime, index=True)
updated_at = Column(DateTime)
# Relationships
track = relationship(
"Tracks",
back_populates="lyrics",
foreign_keys=[track_id],
primaryjoin=(Tracks.id == foreign(track_id)),
remote_side=Tracks.id,
)
# PostgreSQL connection - using environment variables
POSTGRES_HOST = os.getenv("POSTGRES_HOST", "localhost")
POSTGRES_PORT = os.getenv("POSTGRES_PORT", "5432")
POSTGRES_DB = os.getenv("POSTGRES_DB", "lrclib")
POSTGRES_USER = os.getenv("POSTGRES_USER", "api")
POSTGRES_PASSWORD = os.getenv("POSTGRES_PASSWORD", "")
# URL-encode the password to handle special characters
encoded_password = urllib.parse.quote_plus(POSTGRES_PASSWORD)
DATABASE_URL: str = f"postgresql+asyncpg://{POSTGRES_USER}:{encoded_password}@{POSTGRES_HOST}:{POSTGRES_PORT}/{POSTGRES_DB}"
async_engine: AsyncEngine = create_async_engine(
DATABASE_URL, pool_size=20, max_overflow=10, pool_pre_ping=True, echo=False
)
AsyncSessionLocal = async_sessionmaker(bind=async_engine, expire_on_commit=False)
async def get_async_db():
"""Get async database session."""
async with AsyncSessionLocal() as session:
yield session

View File

@@ -14,9 +14,7 @@ class Aggregate:
Aggregate all source methods
"""
def __init__(self, exclude_methods=None) -> None:
if not exclude_methods:
exclude_methods: list = []
def __init__(self, exclude_methods: list = []) -> None:
self.exclude_methods = exclude_methods
self.redis_cache = redis_cache.RedisCache()
self.notifier = notifier.DiscordNotifier()
@@ -70,14 +68,14 @@ class Aggregate:
if plain: # do not record LRC fails
try:
await self.redis_cache.increment_found_count("failed")
self.notifier.send(
"WARNING",
f"Could not find {artist} - {song} via queried sources.",
)
# await self.notifier.send(
# "WARNING",
# f"Could not find {artist} - {song} via queried sources.",
# )
except Exception as e:
traceback.print_exc()
logging.info("Could not increment redis failed counter: %s", str(e))
self.notifier.send(
await self.notifier.send(
f"ERROR @ {__file__.rsplit('/', maxsplit=1)[-1]}",
f"Could not increment redis failed counter: {str(e)}",
)

View File

@@ -343,15 +343,19 @@ class Cache:
)
else:
best_match = (result_tracks[0], 100)
if not best_match or confidence < 90:
if not best_match:
return None
(candidate, confidence) = best_match
if confidence < 90:
return None
logging.info("Result found on %s", self.label)
matched = self.get_matched(
sqlite_rows=results,
matched_candidate=candidate,
confidence=confidence,
)
if matched is None:
return None
time_end: float = time.time()
time_diff: float = time_end - time_start
matched.time = time_diff

View File

@@ -45,11 +45,11 @@ class Genius:
Optional[LyricsResult]: The result, if found - None otherwise.
"""
try:
artist = artist.strip().lower()
song = song.strip().lower()
artist_name = artist.strip().lower()
song_name = song.strip().lower()
time_start: float = time.time()
logging.info("Searching %s - %s on %s", artist, song, self.label)
search_term: str = f"{artist}%20{song}"
logging.info("Searching %s - %s on %s", artist_name, song_name, self.label)
search_term: str = f"{artist_name}%20{song_name}"
returned_lyrics: str = ""
async with ClientSession() as client:
async with client.get(
@@ -100,10 +100,13 @@ class Genius:
)
for returned in possible_matches
]
searched: str = f"{artist} - {song}"
best_match: tuple = self.matcher.find_best_match(
searched: str = f"{artist_name} - {song_name}"
best_match: Optional[tuple] = self.matcher.find_best_match(
input_track=searched, candidate_tracks=to_scrape
)
if not best_match:
raise InvalidGeniusResponseException("No matching result")
logging.info("To scrape: %s", to_scrape)
((scrape_stub, track), confidence) = best_match
scrape_url: str = f"{self.genius_url}{scrape_stub[1:]}"
@@ -157,8 +160,8 @@ class Genius:
returned_lyrics: str = self.datautils.scrub_lyrics(
returned_lyrics
)
artist: str = track.split(" - ", maxsplit=1)[0]
song: str = track.split(" - ", maxsplit=1)[1]
artist = track.split(" - ", maxsplit=1)[0]
song = track.split(" - ", maxsplit=1)[1]
logging.info("Result found on %s", self.label)
time_end: float = time.time()
time_diff: float = time_end - time_start

View File

@@ -1,45 +1,41 @@
import time
import traceback
import logging
from typing import Optional, Union
from aiohttp import ClientTimeout, ClientSession
from tenacity import retry, stop_after_attempt, wait_fixed
from typing import Optional
from sqlalchemy.future import select
from lyric_search import utils
from lyric_search.constructors import LyricsResult
from . import common, cache, redis_cache
from lyric_search.constructors import InvalidLRCLibResponseException
from lyric_search.models import Tracks, Lyrics, AsyncSessionLocal
from . import redis_cache
logger = logging.getLogger()
log_level = logging.getLevelName(logger.level)
class LRCLib:
"""LRCLib Search Module"""
"""LRCLib Search Module - Local PostgreSQL Database"""
def __init__(self) -> None:
self.label: str = "LRCLib"
self.lrclib_url: str = "https://lrclib.net/api/search"
self.headers: dict = common.SCRAPE_HEADERS
self.timeout = ClientTimeout(connect=3, sock_read=8)
self.label: str = "LRCLib-Cache"
self.datautils = utils.DataUtils()
self.matcher = utils.TrackMatcher()
self.cache = cache.Cache()
self.redis_cache = redis_cache.RedisCache()
@retry(stop=stop_after_attempt(2), wait=wait_fixed(0.5))
async def search(
self,
artist: str,
song: str,
plain: Optional[bool] = True,
duration: Optional[int] = None,
raw: bool = False,
) -> Optional[LyricsResult]:
"""
LRCLib Search
LRCLib Local Database Search
Args:
artist (str): the artist to search
song (str): the song to search
plain (bool): return plain lyrics (True) or synced lyrics (False)
duration (int): optional track duration for better matching
raw (bool): return raw LRC string instead of parsed object (only for synced)
Returns:
Optional[LyricsResult]: The result, if found - None otherwise.
"""
@@ -47,140 +43,117 @@ class LRCLib:
artist = artist.strip().lower()
song = song.strip().lower()
time_start: float = time.time()
lrc_obj: Optional[list[dict]] = None
logging.info("Searching %s - %s on %s", artist, song, self.label)
input_track: str = f"{artist} - {song}"
returned_lyrics: str = ""
async with ClientSession() as client:
async with await client.get(
self.lrclib_url,
params={
"artist_name": artist,
"track_name": song,
**({"duration": duration} if duration else {}),
},
timeout=self.timeout,
headers=self.headers,
) as request:
request.raise_for_status()
async with AsyncSessionLocal() as db:
best_match = None
text: Optional[str] = await request.text()
if not text:
raise InvalidLRCLibResponseException("No search response.")
if len(text) < 100:
raise InvalidLRCLibResponseException(
"Search response text was invalid (len < 100 chars.)"
# Try exact match first (fastest)
result = await db.execute(
select(
Tracks.artist_name,
Tracks.name,
Lyrics.plain_lyrics,
Lyrics.synced_lyrics,
)
.join(Lyrics, Tracks.id == Lyrics.track_id)
.filter(
Tracks.artist_name_lower == artist,
Tracks.name_lower == song,
)
.limit(1)
)
best_match = result.first()
# If no exact match, try prefix match (faster than full ILIKE)
if not best_match:
result = await db.execute(
select(
Tracks.artist_name,
Tracks.name,
Lyrics.plain_lyrics,
Lyrics.synced_lyrics,
)
.join(Lyrics, Tracks.id == Lyrics.track_id)
.filter(
Tracks.artist_name_lower.like(f"{artist}%"),
Tracks.name_lower.like(f"{song}%"),
)
.limit(1)
)
best_match = result.first()
search_data: Optional[Union[list, dict]] = await request.json()
if not isinstance(search_data, list | dict):
raise InvalidLRCLibResponseException("No JSON search data.")
# If still no match, try full ILIKE (slowest)
if not best_match:
result = await db.execute(
select(
Tracks.artist_name,
Tracks.name,
Lyrics.plain_lyrics,
Lyrics.synced_lyrics,
)
.join(Lyrics, Tracks.id == Lyrics.track_id)
.filter(
Tracks.artist_name_lower.ilike(f"%{artist}%"),
Tracks.name_lower.ilike(f"%{song}%"),
)
.limit(1)
)
best_match = result.first()
# logging.info("Search Data:\n%s", search_data)
if not best_match:
logging.info("No result found on %s", self.label)
return None
if not isinstance(search_data, list):
raise InvalidLRCLibResponseException("Invalid JSON.")
returned_artist = best_match.artist_name
returned_song = best_match.name
# Filter by duration if provided
if duration:
search_data = [
r
for r in search_data
if abs(r.get("duration", 0) - duration) <= 10
]
if plain:
possible_matches = [
(
x,
f"{result.get('artistName')} - {result.get('trackName')}",
)
for x, result in enumerate(search_data)
]
if plain:
if not best_match.plain_lyrics:
logging.info("No plain lyrics available on %s", self.label)
return None
returned_lyrics = best_match.plain_lyrics
returned_lyrics = self.datautils.scrub_lyrics(returned_lyrics)
lrc_obj = None
else:
if not best_match.synced_lyrics:
logging.info("No synced lyrics available on %s", self.label)
return None
returned_lyrics = best_match.synced_lyrics
if raw:
lrc_obj = returned_lyrics
else:
logging.info(
"Limiting possible matches to only those with non-null syncedLyrics"
)
possible_matches = [
(
x,
f"{result.get('artistName')} - {result.get('trackName')}",
)
for x, result in enumerate(search_data)
if isinstance(result["syncedLyrics"], str)
]
best_match = None
try:
match_result = self.matcher.find_best_match(
input_track,
possible_matches, # type: ignore
)
if match_result:
best_match = match_result[0]
except: # noqa
pass
if not best_match:
return
best_match_id = best_match[0]
if not isinstance(search_data[best_match_id]["artistName"], str):
raise InvalidLRCLibResponseException(
f"Invalid JSON: Cannot find artistName key.\n{search_data}"
)
if not isinstance(search_data[best_match_id]["trackName"], str):
raise InvalidLRCLibResponseException(
f"Invalid JSON: Cannot find trackName key.\n{search_data}"
)
returned_artist: str = search_data[best_match_id]["artistName"]
returned_song: str = search_data[best_match_id]["trackName"]
if plain:
if not isinstance(
search_data[best_match_id]["plainLyrics"], str
):
raise InvalidLRCLibResponseException(
f"Invalid JSON: Cannot find plainLyrics key.\n{search_data}"
)
returned_lyrics: str = search_data[best_match_id]["plainLyrics"]
returned_lyrics = self.datautils.scrub_lyrics(returned_lyrics)
else:
if not isinstance(
search_data[best_match_id]["syncedLyrics"], str
):
raise InvalidLRCLibResponseException(
f"Invalid JSON: Cannot find syncedLyrics key.\n{search_data}"
)
returned_lyrics: str = search_data[best_match_id][
"syncedLyrics"
]
lrc_obj = self.datautils.create_lrc_object(returned_lyrics)
returned_track: str = f"{returned_artist} - {returned_song}"
match_result = self.matcher.find_best_match(
input_track=input_track, candidate_tracks=[(0, returned_track)]
)
if not match_result:
return # No suitable match found
_matched, confidence = match_result
logging.info("Result found on %s", self.label)
time_end: float = time.time()
time_diff: float = time_end - time_start
matched = LyricsResult(
artist=returned_artist,
song=returned_song,
src=self.label,
lyrics=returned_lyrics if plain else lrc_obj, # type: ignore
confidence=confidence,
time=time_diff,
)
await self.redis_cache.increment_found_count(self.label)
if plain:
await self.cache.store(matched)
return matched
# Calculate match confidence
input_track = f"{artist} - {song}"
returned_track = f"{returned_artist} - {returned_song}"
match_result = self.matcher.find_best_match(
input_track=input_track, candidate_tracks=[(0, returned_track)]
)
if not match_result:
return None
_matched, confidence = match_result
logging.info("Result found on %s", self.label)
time_end = time.time()
time_diff = time_end - time_start
matched = LyricsResult(
artist=returned_artist,
song=returned_song,
src=self.label,
lyrics=returned_lyrics if plain else lrc_obj, # type: ignore
confidence=confidence,
time=time_diff,
)
await self.redis_cache.increment_found_count(self.label)
return matched
except Exception as e:
logging.debug("Exception: %s", str(e))
traceback.print_exc()
logging.error("Exception in %s: %s", self.label, str(e))
return None

View File

@@ -13,7 +13,7 @@ from lyric_search import notifier
from lyric_search.constructors import LyricsResult
import redis.asyncio as redis
from redis.commands.search.query import Query # type: ignore
from redis.commands.search.indexDefinition import IndexDefinition, IndexType # type: ignore
from redis.commands.search.index_definition import IndexDefinition, IndexType # type: ignore
from redis.commands.search.field import TextField, Field # type: ignore
from redis.commands.json.path import Path # type: ignore
from . import private

File diff suppressed because it is too large Load Diff

View File

@@ -1,252 +0,0 @@
#!/usr/bin/env liquidsoap
set("log.file.path","/home/kyle/.lsl.txt")
set("log.stdout",true)
set("harbor.bind_addrs", ["127.0.0.1"])
# Buffer and timing settings
set("frame.duration",0.02)
set("root.max_latency",2.)
set("audio.converter.samplerate.libsamplerate.quality","best")
set("clock.allow_streaming_errors",false)
# Get next track dynamically [Each station]
def get_next_main() =
uri = list.hd(default="", process.read.lines("uv run get_next_track.py main"))
[request.create(uri)]
end
def get_next_rock() =
uri = list.hd(default="", process.read.lines("uv run get_next_track.py rock"))
[request.create(uri)]
end
def get_next_electronic() =
uri = list.hd(default="", process.read.lines("uv run get_next_track.py electronic"))
[request.create(uri)]
end
def get_next_rap() =
uri = list.hd(default="", process.read.lines("uv run get_next_track.py rap"))
[request.create(uri)]
end
#def get_next_classical() =
# uri = list.hd(default="", process.read.lines("uv run get_next_track.py classical"))
# [request.create(uri)]
#end
def get_next_pop() =
uri = list.hd(default="", process.read.lines("uv run get_next_track.py pop"))
[request.create(uri)]
end
# Set up queues [Each station]
main_list = request.dynamic(
id="requests",
get_next_main,
retry_delay=1.0,
timeout=20.0
)
rock_list = request.dynamic(
id="rock_requests",
get_next_rock,
retry_delay=1.0,
timeout=20.0
)
electronic_list = request.dynamic(
id="electronic_requests",
get_next_electronic,
retry_delay=1.0,
timeout=20.0
)
rap_list = request.dynamic(
id="rap_requests",
get_next_rap,
retry_delay=1.0,
timeout=20.0
)
#classical_list = request.dynamic.list(
# id="classical_requests",
# get_next_classical,
# prefetch=0
#)
pop_list = request.dynamic(
id="pop_requests",
get_next_pop,
retry_delay=1.0,
timeout=20.0
)
# Standard
silence = single("/home/kyle/ls/silence.ogg")
# Queue [Each station]
def main_queue(remaining, _) =
log("MAIN: Queueing with #{remaining} seconds remaining")
if not main_list.fetch() then
log("Fetching next query failed")
end
end
def rock_queue(remaining, _) =
log("ROCK: Queueing with #{remaining} seconds remaining")
if not rock_list.fetch() then
log("Fetching next query failed")
end
end
def electronic_queue(remaining, _) =
log("ELECTRONIC: Queueing with #{remaining} seconds remaining")
if not electronic_list.fetch() then
log("Fetching next query failed")
end
end
def rap_queue(remaining, _) =
log("RAP: Queueing with #{remaining} seconds remaining")
if not rap_list.fetch() then
log("Fetching next query failed")
end
end
#def classical_queue(remaining, _) =
# log("CLASSICAL: Queueing with #{remaining} seconds remaining")
# if not classical_list.fetch() then
# log("Fetching next query failed")
# end
#end
def pop_queue(remaining, _) =
log("POP: Queueing with #{remaining} seconds remaining")
if not pop_list.fetch() then
log("Fetching next query failed")
end
end
# Initial fetch [Each station]
main_list.fetch()
rock_list.fetch()
electronic_list.fetch()
rap_list.fetch()
#classical_list.fetch()
pop_list.fetch()
# Source setup [Each station]
def create_source(s,q) =
source.dynamic(s, track_sensitive=true, {q()})
end
main_source = create_source(main_list, main_queue)
rock_source = create_source(rock_list, rock_queue)
electronic_source = create_source(electronic_list, electronic_queue)
rap_source = create_source(rap_list, rap_queue)
#classical_source = create_source(classical_list, classical_queue)
pop_source = create_source(pop_list, pop_queue)
all_tracks_main = fallback(track_sensitive=false, [main_source, silence])
all_tracks_rock = fallback(track_sensitive=false, [rock_source, silence])
all_tracks_electronic = fallback(track_sensitive=false, [electronic_source, silence])
all_tracks_rap = fallback(track_sensitive=false, [rap_source, silence])
#all_tracks_classical = fallback(track_sensitive=false, [classical_source, silence])
all_tracks_pop = fallback(track_sensitive=false, [pop_source, silence])
# HLS Setup [Standard]
aac_lofi = %ffmpeg(format="mpegts",
%audio(codec="aac",
channels=2,
ar=48000,
b="128k"))
aac_midfi = %ffmpeg(format="mpegts",
%audio(codec="aac",
channels=2,
ar=48000,
b="256k"))
aac_hifi = %ffmpeg(format="mpegts",
%audio(codec="aac",
channels=2,
ar=48000,
b="512k"))
streams =
[("aac_lofi", aac_lofi), ("aac_midfi", aac_midfi), ("aac_hifi", aac_hifi)]
# HLS Outputs [Each station]
def create_hls_output(~name, source) =
output.file.hls(
playlist="#{name}.m3u8",
segment_duration=0.5,
segments=10,
segments_overhead=5,
persist_at="/nvme/pub/hls/#{name}/state.config",
"/nvme/pub/hls/#{name}",
streams,
source
)
end
create_hls_output(name="main", mksafe(main_source))
create_hls_output(name="rock", mksafe(rock_source))
create_hls_output(name="electronic", mksafe(electronic_source))
create_hls_output(name="rap", mksafe(rap_source))
#output.file.hls(
# playlist="classical.m3u8",
# segment_duration=0.45,
# segments=9,
# segments_overhead=3,
# persist_at="/nvme/pub/hls/classical_state.config",
# "/nvme/pub/hls/classical",
# streams,
# mksafe(classical_source)
#)
create_hls_output(name="pop", mksafe(pop_source))
# HTTP Server
def get_next_http(~protocol,~data,~headers,uri) =
source =
if data == "main" then main_source
elsif data == "rock" then rock_source
elsif data == "electronic" then electronic_source
elsif data == "rap" then rap_source
elsif data == "pop" then pop_source
else null() end
if source != null() then
source.skip(source)
http.response(
protocol=protocol,
code=200,
data="OK #{data}"
)
end
harbor.http.register(port=29000, method="POST", "/next", get_next_http)
# EOF

View File

@@ -1,270 +0,0 @@
#!/usr/bin/liquidsoap
set("log.file.path", "/home/kyle/.lsl.txt")
set("log.stdout", true)
set("harbor.bind_addrs", ["127.0.0.1"])
# Get next track dynamically [Each station]
def get_next_main() =
uri = list.hd(default="", process.read.lines("uv run get_next_track.py main"))
[request.create(uri)]
end
def get_next_rock() =
uri = list.hd(default="", process.read.lines("uv run get_next_track.py rock"))
[request.create(uri)]
end
def get_next_electronic() =
uri = list.hd(default="", process.read.lines("uv run get_next_track.py electronic"))
[request.create(uri)]
end
def get_next_rap() =
uri = list.hd(default="", process.read.lines("uv run get_next_track.py rap"))
[request.create(uri)]
end
#def get_next_classical() =
# uri = list.hd(default="", process.read.lines("uv run get_next_track.py classical"))
# [request.create(uri)]
#end
def get_next_pop() =
uri = list.hd(default="", process.read.lines("uv run get_next_track.py pop"))
[request.create(uri)]
end
# Set up queues [Each station]
main_list = request.dynamic.list(
id="requests",
get_next_main,
prefetch=0
)
rock_list = request.dynamic.list(
id="rock_requests",
get_next_rock,
prefetch=0
)
electronic_list = request.dynamic.list(
id="electronic_requests",
get_next_electronic,
prefetch=0
)
rap_list = request.dynamic.list(
id="rap_requests",
get_next_rap,
prefetch=0
)
#classical_list = request.dynamic.list(
# id="classical_requests",
# get_next_classical,
# prefetch=0
#)
pop_list = request.dynamic.list(
id="pop_requests",
get_next_pop,
prefetch=0
)
# Standard
silence = single("/home/kyle/ls/silence.ogg")
# Queue [Each station]
def main_queue(remaining, _) =
log("MAIN: Queueing with #{remaining} seconds remaining")
if not main_list.fetch() then
log("Fetching next query failed")
end
end
def rock_queue(remaining, _) =
log("ROCK: Queueing with #{remaining} seconds remaining")
if not rock_list.fetch() then
log("Fetching next query failed")
end
end
def electronic_queue(remaining, _) =
log("ELECTRONIC: Queueing with #{remaining} seconds remaining")
if not electronic_list.fetch() then
log("Fetching next query failed")
end
end
def rap_queue(remaining, _) =
log("RAP: Queueing with #{remaining} seconds remaining")
if not rap_list.fetch() then
log("Fetching next query failed")
end
end
#def classical_queue(remaining, _) =
# log("CLASSICAL: Queueing with #{remaining} seconds remaining")
# if not classical_list.fetch() then
# log("Fetching next query failed")
# end
#end
def pop_queue(remaining, _) =
log("POP: Queueing with #{remaining} seconds remaining")
if not pop_list.fetch() then
log("Fetching next query failed")
end
end
# Initial fetch [Each station]
main_list.fetch()
rock_list.fetch()
electronic_list.fetch()
rap_list.fetch()
#classical_list.fetch()
pop_list.fetch()
# Source setup [Each station]
main_source = source.on_end(delay=1.0, main_list, main_queue)
rock_source = source.on_end(delay=1.0, rock_list, rock_queue)
electronic_source = source.on_end(delay=1.0, electronic_list, electronic_queue)
rap_source = source.on_end(delay=1.0, rap_list, rap_queue)
#classical_source = source.on_end(delay=1.0, classical_list, classical_queue)
pop_source = source.on_end(delay=1.0, pop_list, pop_queue)
all_tracks_main = fallback(track_sensitive=false, [main_source, silence])
all_tracks_rock = fallback(track_sensitive=false, [rock_source, silence])
all_tracks_electronic = fallback(track_sensitive=false, [electronic_source, silence])
all_tracks_rap = fallback(track_sensitive=false, [rap_source, silence])
#all_tracks_classical = fallback(track_sensitive=false, [classical_source, silence])
all_tracks_pop = fallback(track_sensitive=false, [pop_source, silence])
# HLS Setup [Standard]
aac_lofi =
%ffmpeg(format = "mpegts", %audio(codec = "aac", channels = 2, ar = 44100))
aac_midfi =
%ffmpeg(
format = "mpegts",
%audio(codec = "aac", channels = 2, ar = 44100, b = "96k")
)
aac_hifi =
%ffmpeg(
format = "mpegts",
%audio(codec = "aac", channels = 2, ar = 44100, b = "448k")
)
streams =
[("aac_lofi", aac_lofi), ("aac_midfi", aac_midfi), ("aac_hifi", aac_hifi)]
# HLS Outputs [Each station]
output.file.hls(
playlist="main.m3u8",
segment_duration=0.5,
segments=9,
segments_overhead=4,
persist_at="/nvme/pub/hls/state.config",
"/nvme/pub/hls/main",
streams,
mksafe(main_source)
)
output.file.hls(
playlist="rock.m3u8",
segment_duration=0.5,
segments=9,
segments_overhead=4,
persist_at="/nvme/pub/hls/rock/state.config",
"/nvme/pub/hls/rock",
streams,
mksafe(rock_source)
)
output.file.hls(
playlist="electronic.m3u8",
segment_duration=0.5,
segments=9,
segments_overhead=4,
persist_at="/nvme/pub/hls/electronic/state.config",
"/nvme/pub/hls/electronic",
streams,
mksafe(electronic_source)
)
output.file.hls(
playlist="rap.m3u8",
segment_duration=0.5,
segments=9,
segments_overhead=4,
persist_at="/nvme/pub/hls/rap_state.config",
"/nvme/pub/hls/rap",
streams,
mksafe(rap_source)
)
#output.file.hls(
# playlist="classical.m3u8",
# segment_duration=0.45,
# segments=9,
# segments_overhead=3,
# persist_at="/nvme/pub/hls/classical_state.config",
# "/nvme/pub/hls/classical",
# streams,
# mksafe(classical_source)
#)
output.file.hls(
playlist="pop.m3u8",
segment_duration=0.5,
segments=9,
segments_overhead=4,
persist_at="/nvme/pub/hls/pop_state.config",
"/nvme/pub/hls/pop",
streams,
mksafe(pop_source)
)
# HTTP Server
def get_next_http(~protocol,~data,~headers,uri) =
if data == "main" then
_req = source.skip(main_source)
elsif data == "rock" then
_req = source.skip(rock_source)
elsif data == "electronic" then
_req = source.skip(electronic_source)
elsif data == "rap" then
_req = source.skip(rap_source)
#elsif data == "classical" then
# _req = source.skip(classical_source)
elsif data == "pop" then
_req = source.skip(pop_source)
end
http.response(
protocol=protocol,
code=200,
data="OK #{data}"
)
end
harbor.http.register(port=29000, method="POST", "/next", get_next_http)
# EOF

View File

@@ -17,7 +17,7 @@ from rapidfuzz import fuzz
from endpoints.constructors import RadioException
import redis.asyncio as redis
from redis.commands.search.query import Query # noqa
from redis.commands.search.indexDefinition import IndexDefinition, IndexType # noqa
from redis.commands.search.index_definition import IndexDefinition, IndexType # noqa
from redis.commands.search.field import TextField # noqa
from redis.commands.json.path import Path # noqa
from lyric_search.sources import private
@@ -339,7 +339,10 @@ class RadioUtil:
time_start: float = time.time()
artist_genre: dict[str, str] = {}
query: str = (
"SELECT genre FROM artist_genre WHERE artist LIKE ? COLLATE NOCASE"
"SELECT REPLACE(GROUP_CONCAT(DISTINCT g.name), ',', ', ') AS genre FROM artists a "
"JOIN artist_genres ag ON a.id = ag.artist_id "
"JOIN genres g ON ag.genre_id = g.id "
"WHERE a.name LIKE ? COLLATE NOCASE"
)
with sqlite3.connect(self.artist_genre_db_path) as _db:
_db.row_factory = sqlite3.Row
@@ -347,7 +350,7 @@ class RadioUtil:
params: tuple[str] = (f"%%{artist}%%",)
_cursor = _db.execute(query, params)
res = _cursor.fetchone()
if not res:
if not res or not res["genre"]:
artist_genre[artist] = "N/A"
continue
artist_genre[artist] = res["genre"]
@@ -367,14 +370,17 @@ class RadioUtil:
try:
artist = artist.strip()
query: str = (
"SELECT genre FROM artist_genre WHERE artist LIKE ? COLLATE NOCASE"
"SELECT REPLACE(GROUP_CONCAT(DISTINCT g.name), ',', ', ') AS genre FROM artists a "
"JOIN artist_genres ag ON a.id = ag.artist_id "
"JOIN genres g ON ag.genre_id = g.id "
"WHERE a.name LIKE ? COLLATE NOCASE"
)
params: tuple[str] = (artist,)
with sqlite3.connect(self.playback_db_path, timeout=2) as _db:
_db.row_factory = sqlite3.Row
_cursor = _db.execute(query, params)
res = _cursor.fetchone()
if not res:
if not res or not res["genre"]:
return "Not Found" # Exception suppressed
# raise RadioException(
# f"Could not locate {artist} in artist_genre_map db."
@@ -480,18 +486,18 @@ class RadioUtil:
)
"""Loading Complete"""
self.playlists_loaded = True
# Request skip from LS to bring streams current
for playlist in self.playlists:
logging.info("Skipping: %s", playlist)
await self._ls_skip(playlist)
self.playlists_loaded = True
except Exception as e:
logging.info("Playlist load failed: %s", str(e))
traceback.print_exc()
def cache_album_art(self, track_id: int, file_path: str) -> None:
"""
Cache Album Art to SQLite DB
Cache Album Art to SQLite DB - IMPROVED VERSION
Args:
track_id (int): Track ID to update
file_path (str): Path to file, for artwork extraction
@@ -499,30 +505,92 @@ class RadioUtil:
None
"""
try:
logging.info(
"cache_album_art: Attempting to store album art for track_id: %s",
track_id,
)
tagger = music_tag.load_file(file_path)
album_art = tagger["artwork"].first.data if tagger else None
with sqlite3.connect(self.album_art_db_path, timeout=2) as db_conn:
db_cursor = db_conn.execute(
"INSERT OR IGNORE INTO album_art (track_id, album_art) VALUES(?, ?)",
(
track_id,
album_art,
),
)
if isinstance(db_cursor.lastrowid, int):
db_conn.commit()
# Validate file exists first
if not os.path.exists(file_path):
logging.warning("cache_album_art: File not found: %s", file_path)
return
logging.info("cache_album_art: Attempting to store album art for track_id: %s", track_id)
# Check if artwork already exists to avoid duplicates
with sqlite3.connect(self.album_art_db_path, timeout=5) as db_conn:
db_conn.row_factory = sqlite3.Row
cursor = db_conn.execute("SELECT track_id FROM album_art WHERE track_id = ?", (track_id,))
if cursor.fetchone():
logging.debug("cache_album_art: Track %s already has album art", track_id)
return
# Load file with better error handling
try:
tagger = music_tag.load_file(file_path)
except Exception as e:
logging.warning("cache_album_art: Failed to load file %s: %s", file_path, e)
return
# Extract artwork with validation
album_art = None
try:
if not tagger:
logging.debug("cache_album_art: No tagger available for track %s", track_id)
return
artwork_field = tagger["artwork"]
if artwork_field and hasattr(artwork_field, 'first') and artwork_field.first:
first_artwork = artwork_field.first
if hasattr(first_artwork, 'data') and first_artwork.data:
potential_art = first_artwork.data
# Validate artwork data
if isinstance(potential_art, bytes) and len(potential_art) > 100:
# Check if it looks like valid image data
if (potential_art.startswith(b'\xff\xd8') or # JPEG
potential_art.startswith(b'\x89PNG') or # PNG
potential_art.startswith(b'GIF87a') or # GIF87a
potential_art.startswith(b'GIF89a') or # GIF89a
potential_art.startswith(b'RIFF')): # WEBP/other RIFF
album_art = potential_art
logging.debug("cache_album_art: Found valid artwork (%s bytes)", len(album_art))
else:
logging.warning("cache_album_art: Invalid artwork format for track %s - not caching", track_id)
return
else:
logging.debug("cache_album_art: No valid artwork data for track %s", track_id)
return
else:
logging.debug("cache_album_art: No artwork data available for track %s", track_id)
return
else:
logging.debug(
"No row inserted for track_id: %s w/ file_path: %s",
track_id,
file_path,
logging.debug("cache_album_art: No artwork field for track %s", track_id)
return
except Exception as e:
logging.warning("cache_album_art: Error extracting artwork for track %s: %s", track_id, e)
return
# Only proceed if we have valid artwork
if not album_art:
logging.debug("cache_album_art: No valid artwork to cache for track %s", track_id)
return
# Insert into database
try:
with sqlite3.connect(self.album_art_db_path, timeout=5) as db_conn:
cursor = db_conn.execute(
"INSERT OR IGNORE INTO album_art (track_id, album_art) VALUES (?, ?)",
(track_id, album_art)
)
if cursor.rowcount == 1:
db_conn.commit()
logging.info("cache_album_art: Successfully cached %s bytes for track %s", len(album_art), track_id)
else:
logging.debug("cache_album_art: No row inserted for track_id: %s (may already exist)", track_id)
except Exception as e:
logging.error("cache_album_art: Database error for track %s: %s", track_id, e)
except Exception as e:
logging.debug("cache_album_art Exception: %s", str(e))
logging.error("cache_album_art: Unexpected error for track %s: %s", track_id, e)
traceback.print_exc()
def get_album_art(self, track_id: int) -> Optional[bytes]:

View File

@@ -45,6 +45,29 @@ load_dotenv()
sr = SRUtil()
logger = logging.getLogger(__name__)
async def check_flac_stream(file_path):
"""Check if the given file contains a FLAC stream using ffprobe."""
cmd = [
"ffprobe",
"-v",
"error",
"-select_streams",
"a:0",
"-show_entries",
"stream=codec_name",
"-of",
"default=noprint_wrappers=1:nokey=1",
file_path,
]
process = await asyncio.create_subprocess_exec(
*cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE
)
stdout, _ = await process.communicate()
return b"flac" in stdout
# ---------- Discord helper ----------
async def discord_notify(
@@ -259,13 +282,16 @@ def bulk_download(track_list: list, quality: str = "FLAC"):
)
)
async def process_tracks():
async def process_tracks(track_list):
per_track_meta = []
all_final_files = []
all_artists = set()
(ROOT_DIR / "completed").mkdir(parents=True, exist_ok=True)
# Ensure aiohttp session is properly closed
async with aiohttp.ClientSession(headers=HEADERS) as session:
print(f"DEBUG: Starting process_tracks with {len(track_list)} tracks")
# Set up a one-time rate-limit callback to notify on the first 429 seen by SRUtil
async def _rate_limit_notify(exc: Exception):
try:
@@ -285,6 +311,7 @@ def bulk_download(track_list: list, quality: str = "FLAC"):
pass
total = len(track_list or [])
for i, track_id in enumerate(track_list or []):
print(f"DEBUG: Processing track {i + 1}/{total}: {track_id}")
track_info = {
"track_id": str(track_id),
"status": "Pending",
@@ -300,31 +327,53 @@ def bulk_download(track_list: list, quality: str = "FLAC"):
track_info["attempts"] = attempt
try:
sr.get_cover_by_album_id
url = await sr.get_stream_url_by_track_id(track_id, quality)
if not url:
raise RuntimeError("No stream URL")
print(f"DEBUG: Getting downloadable for track {track_id}")
# Fetch downloadable (handles DASH and others)
downloadable = await sr._safe_api_call(
sr.streamrip_client.get_downloadable,
str(track_id),
2 if quality == "FLAC" else 1,
retries=3,
)
parsed = urlparse(url)
clean_path = unquote(parsed.path)
ext = Path(clean_path).suffix or ".mp3"
print(f"DEBUG: Got downloadable: {type(downloadable)}")
if not downloadable:
raise RuntimeError("No downloadable created")
ext = f".{downloadable.extension}"
tmp_file = Path(f"/tmp/{uuid.uuid4().hex}{ext}")
async with session.get(url) as resp:
resp.raise_for_status()
with open(tmp_file, "wb") as f:
async for chunk in resp.content.iter_chunked(64 * 1024):
f.write(chunk)
print(f"DEBUG: Starting download to {tmp_file}")
# Download
print(f"TRACK {track_id}: Starting download")
try:
await downloadable._download(
str(tmp_file), callback=lambda x=None: None
)
print(
f"TRACK {track_id}: Download method completed normally"
)
except Exception as download_e:
print(
f"TRACK {track_id}: Download threw exception: {download_e}"
)
raise
print(
f"DEBUG: Download completed, file exists: {tmp_file.exists()}"
)
if not tmp_file.exists():
raise RuntimeError(
f"Download completed but no file created: {tmp_file}"
)
print(f"DEBUG: Fetching metadata for track {track_id}")
# Metadata fetch
try:
md = await sr.get_metadata_by_track_id(track_id) or {}
print(f"DEBUG: Metadata fetched: {bool(md)}")
except MetadataFetchError as me:
# Permanent metadata failure — notify and continue (mark track failed)
msg = f"Metadata permanently failed for track {track_id}: {me}"
try:
send_log_to_discord(msg, "ERROR", target)
except Exception:
pass
# Permanent metadata failure — mark failed and break
track_info["status"] = "Failed"
track_info["error"] = str(me)
per_track_meta.append(track_info)
@@ -333,6 +382,7 @@ def bulk_download(track_list: list, quality: str = "FLAC"):
job.meta["progress"] = int(((i + 1) / total) * 100)
job.save_meta()
break
artist_raw = md.get("artist") or "Unknown Artist"
album_raw = md.get("album") or "Unknown Album"
title_raw = md.get("title") or f"Track {track_id}"
@@ -341,15 +391,19 @@ def bulk_download(track_list: list, quality: str = "FLAC"):
album = sanitize_filename(album_raw)
title = sanitize_filename(title_raw)
print(f"TRACK {track_id}: Processing '{title}' by {artist}")
all_artists.add(artist)
album_dir = staging_root / artist / album
album_dir.mkdir(parents=True, exist_ok=True)
final_file = ensure_unique_path(album_dir / f"{title}{ext}")
# Move file into final location first (tags will be updated on moved file)
# Move to final location
print(f"TRACK {track_id}: Moving to final location...")
tmp_file.rename(final_file)
print(f"TRACK {track_id}: File moved successfully")
# Try to fetch cover art via SRUtil (use album_id from metadata)
# Fetch cover art
try:
album_field = md.get("album")
album_id = md.get("album_id") or (
@@ -370,49 +424,46 @@ def bulk_download(track_list: list, quality: str = "FLAC"):
else:
cover_url = md.get("cover_url")
# Embed tags + artwork using music_tag if available, falling back to mediafile tagging
# Embed tags
embedded = False
try:
if cover_url:
try:
timeout = aiohttp.ClientTimeout(total=15)
async with session.get(
cover_url, timeout=timeout
) as img_resp:
if img_resp.status == 200:
img_bytes = await img_resp.read()
else:
img_bytes = None
# Notify Discord about failed cover download (HTTP error)
try:
send_log_to_discord(
f"Cover download HTTP `{img_resp.status}` for track `{track_id} album_id={album_id} url={cover_url} artist={artist} album={album}`",
"WARNING",
target,
)
except Exception:
pass
except Exception as e:
img_bytes = None
# Notify Discord about exception during cover download
try:
send_log_to_discord(
f"Cover download exception for track `{track_id} album_id={album_id} url={cover_url} artist={artist} album={album}`: `{e}`",
"WARNING",
target,
)
except Exception:
pass
else:
img_bytes = None
# Prefer music_tag if available (keeps compatibility with add_cover_art.py)
img_bytes = None
if cover_url:
try:
from music_tag import load_file as mt_load_file # type: ignore
timeout = aiohttp.ClientTimeout(total=15)
async with session.get(
cover_url, timeout=timeout
) as img_resp:
if img_resp.status == 200:
img_bytes = await img_resp.read()
else:
img_bytes = None
try:
send_log_to_discord(
f"Cover download HTTP `{img_resp.status}` for track `{track_id} album_id={album_id} url={cover_url} artist={artist} album={album}`",
"WARNING",
target,
)
except Exception:
pass
except Exception as e:
img_bytes = None
try:
mf = mt_load_file(str(final_file))
# set basic tags
send_log_to_discord(
f"Cover download exception for track `{track_id} album_id={album_id} url={cover_url} artist={artist} album={album}`: `{e}`",
"WARNING",
target,
)
except Exception:
pass
# Try music_tag first
try:
from music_tag import load_file as mt_load_file # type: ignore
# Add validation for `mf` object
try:
mf = mt_load_file(str(final_file))
if mf is not None:
if md.get("title"):
mf["title"] = md.get("title")
if md.get("artist"):
@@ -429,37 +480,40 @@ def bulk_download(track_list: list, quality: str = "FLAC"):
mf["artwork"] = img_bytes
mf.save()
embedded = True
except Exception:
else:
logger.error("Failed to load file with music_tag.")
embedded = False
except Exception:
embedded = False
# If music_tag not available or failed, fallback to mediafile tagging
if not embedded:
# If we had a cover_url but no bytes, log a warning to Discord
try:
if cover_url and not img_bytes:
send_log_to_discord(
f"Cover art not available for track {track_id} album_id={album_id} url={cover_url}",
"WARNING",
target,
)
except Exception:
pass
tag_with_mediafile(str(final_file), md)
except Exception:
# Ensure at least the basic tags are written
embedded = False
if not embedded:
try:
if cover_url and not img_bytes:
send_log_to_discord(
f"Cover art not available for track {track_id} album_id={album_id} url={cover_url}",
"WARNING",
target,
)
except Exception:
pass
try:
tag_with_mediafile(str(final_file), md)
except Exception:
pass
tmp_file = None
# Success
tmp_file = None
track_info["status"] = "Success"
track_info["file_path"] = str(final_file)
track_info["error"] = None
all_final_files.append(final_file)
print(
f"TRACK {track_id}: SUCCESS! Progress: {((i + 1) / total) * 100:.0f}%"
)
if job:
job.meta["progress"] = int(((i + 1) / total) * 100)
job.meta["tracks"] = per_track_meta + [track_info]
@@ -469,7 +523,7 @@ def bulk_download(track_list: list, quality: str = "FLAC"):
except aiohttp.ClientResponseError as e:
msg = f"Track {track_id} attempt {attempt} ClientResponseError: {e}"
send_log_to_discord(msg, "WARNING", target)
if e.status == 429:
if getattr(e, "status", None) == 429:
wait_time = min(60, 2**attempt)
await asyncio.sleep(wait_time)
else:
@@ -662,7 +716,7 @@ def bulk_download(track_list: list, quality: str = "FLAC"):
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
return loop.run_until_complete(process_tracks())
return loop.run_until_complete(process_tracks(track_list))
except Exception as e:
send_log_to_discord(
f"bulk_download failed: {e}\n{traceback.format_exc()}", "CRITICAL", target
@@ -672,3 +726,13 @@ def bulk_download(track_list: list, quality: str = "FLAC"):
job.save_meta()
finally:
loop.close()
# Correct integration of FLAC stream check
async def process_tracks(track_list):
for i, track_id in enumerate(track_list or []):
combined_path = f"/tmp/{uuid.uuid4().hex}_combined.m4s" # Example path
if not await check_flac_stream(combined_path):
logger.error(f"No FLAC stream found in {combined_path}. Skipping file.")
continue
# Proceed with decoding pipeline

View File

@@ -19,15 +19,21 @@ class MetadataFetchError(Exception):
"""Raised when metadata fetch permanently fails after retries."""
# Suppress all logging output from this module and its children
for name in [__name__, "utils.sr_wrapper"]:
# Suppress noisy logging from this module and from the `streamrip` library
# We set propagate=False so messages don't bubble up to the root logger and
# attach a NullHandler where appropriate to avoid "No handler found" warnings.
for name in [__name__, "utils.sr_wrapper", "streamrip", "streamrip.client"]:
logger = logging.getLogger(name)
logger.setLevel(logging.INFO) # Temporarily set to INFO for debugging LRC
# Keep default level (or raise to WARNING) so non-important logs are dropped
try:
logger.setLevel(logging.WARNING)
except Exception:
pass
logger.propagate = False
for handler in logger.handlers:
handler.setLevel(logging.INFO)
# Also set the root logger to CRITICAL as a last resort (may affect global logging)
# logging.getLogger().setLevel(logging.CRITICAL)
# Ensure a NullHandler is present so logs don't propagate and no missing-handler
# warnings are printed when the package emits records.
if not any(isinstance(h, logging.NullHandler) for h in logger.handlers):
logger.addHandler(logging.NullHandler())
load_dotenv()
@@ -190,12 +196,23 @@ class SRUtil:
title_match = self.is_fuzzy_match(expected_title, found_title, threshold)
return artist_match and album_match and title_match
def dedupe_by_key(self, key: str, entries: list[dict]) -> list[dict]:
deduped = {}
def dedupe_by_key(
self, key: str | list[str], entries: list[dict]
) -> list[dict]:
"""Return entries de-duplicated by one or more keys."""
keys = [key] if isinstance(key, str) else list(key)
if not keys:
return entries
def normalize(value: Any) -> str:
return str(value or "").strip().lower()
deduped: dict[tuple[str, ...], dict] = {}
for entry in entries:
norm = entry[key].strip().lower()
if norm not in deduped:
deduped[norm] = entry
composite_key = tuple(normalize(entry.get(k)) for k in keys)
if composite_key not in deduped:
deduped[composite_key] = entry
return list(deduped.values())
def group_artists_by_name(
@@ -444,7 +461,7 @@ class SRUtil:
return None
if not metadata:
return None
albums = self.dedupe_by_key("title", metadata.get("albums", []))
albums = self.dedupe_by_key(["title", "releaseDate"], metadata.get("albums", []))
albums_out = [
{
"artist": ", ".join(artist["name"] for artist in album["artists"]),
@@ -684,21 +701,22 @@ class SRUtil:
except Exception as e:
# Exponential backoff with jitter for 429 or other errors
delay = self.RETRY_DELAY * (2 ** (attempt - 1)) + random.uniform(0, 0.5)
logging.warning(
"Metadata fetch failed for track %s (attempt %d/%d): %s. Retrying in %.2fs",
track_id,
attempt,
self.MAX_METADATA_RETRIES,
str(e),
delay,
)
if attempt < self.MAX_METADATA_RETRIES:
logging.warning(
"Retrying metadata fetch for track %s (attempt %d/%d): %s. Next retry in %.2fs",
track_id,
attempt,
self.MAX_METADATA_RETRIES,
str(e),
delay,
)
await asyncio.sleep(delay)
else:
logging.error(
"Metadata fetch failed permanently for track %s after %d attempts",
"Metadata fetch failed permanently for track %s after %d attempts: %s",
track_id,
self.MAX_METADATA_RETRIES,
str(e),
)
# Raise a specific exception so callers can react (e.g. notify)
raise MetadataFetchError(
@@ -788,7 +806,7 @@ class SRUtil:
tracks_with_diff.sort(key=lambda x: x[1])
best_track, min_diff = tracks_with_diff[0]
logging.info(f"SR: Best match duration diff: {min_diff}s")
# If the closest match is more than 5 seconds off, consider no match
# If the closest match is more than 10 seconds off, consider no match
if min_diff > 10:
logging.info("SR: Duration diff too large, no match")
return None