misc
This commit is contained in:
@@ -1,24 +1,46 @@
|
||||
# isort: skip_file
|
||||
from typing import Optional, Any, Callable
|
||||
from uuid import uuid4
|
||||
from urllib.parse import urlparse
|
||||
from pathlib import Path
|
||||
import hashlib
|
||||
import traceback
|
||||
import logging
|
||||
import random
|
||||
import asyncio
|
||||
import json
|
||||
import os
|
||||
import aiohttp
|
||||
import time
|
||||
from streamrip.client import TidalClient # type: ignore
|
||||
from streamrip.config import Config as StreamripConfig # type: ignore
|
||||
from dotenv import load_dotenv
|
||||
from rapidfuzz import fuzz
|
||||
|
||||
# Monkey-patch streamrip's Tidal client credentials BEFORE importing TidalClient
|
||||
import streamrip.client.tidal as _tidal_module # type: ignore # noqa: E402
|
||||
_tidal_module.CLIENT_ID = "fX2JxdmntZWK0ixT"
|
||||
_tidal_module.CLIENT_SECRET = "1Nn9AfDAjxrgJFJbKNWLeAyKGVGmINuXPPLHVXAvxAg="
|
||||
_tidal_module.AUTH = aiohttp.BasicAuth(
|
||||
login=_tidal_module.CLIENT_ID,
|
||||
password=_tidal_module.CLIENT_SECRET
|
||||
)
|
||||
|
||||
from streamrip.client import TidalClient # type: ignore # noqa: E402
|
||||
from streamrip.config import Config as StreamripConfig # type: ignore # noqa: E402
|
||||
from dotenv import load_dotenv # noqa: E402
|
||||
from rapidfuzz import fuzz # noqa: E402
|
||||
|
||||
# Path to persist Tidal tokens across restarts
|
||||
TIDAL_TOKEN_CACHE_PATH = Path(__file__).parent.parent / "tidal_token.json"
|
||||
|
||||
|
||||
class MetadataFetchError(Exception):
|
||||
"""Raised when metadata fetch permanently fails after retries."""
|
||||
|
||||
|
||||
# How long before token expiry to proactively refresh (seconds)
|
||||
TIDAL_TOKEN_REFRESH_BUFFER = 600 # 10 minutes
|
||||
# Maximum age of a session before forcing a fresh login (seconds)
|
||||
TIDAL_SESSION_MAX_AGE = 1800 # 30 minutes
|
||||
|
||||
|
||||
# Suppress noisy logging from this module and from the `streamrip` library
|
||||
# We set propagate=False so messages don't bubble up to the root logger and
|
||||
# attach a NullHandler where appropriate to avoid "No handler found" warnings.
|
||||
@@ -47,27 +69,11 @@ class SRUtil:
|
||||
def __init__(self) -> None:
|
||||
"""Initialize StreamRip utility."""
|
||||
self.streamrip_config = StreamripConfig.defaults()
|
||||
self.streamrip_config.session.tidal.user_id = os.getenv("tidal_user_id", "")
|
||||
self.streamrip_config.session.tidal.access_token = os.getenv(
|
||||
"tidal_access_token", ""
|
||||
)
|
||||
self.streamrip_config.session.tidal.refresh_token = os.getenv(
|
||||
"tidal_refresh_token", ""
|
||||
)
|
||||
self.streamrip_config.session.tidal.token_expiry = os.getenv(
|
||||
"tidal_token_expiry", ""
|
||||
)
|
||||
self.streamrip_config.session.tidal.country_code = os.getenv(
|
||||
"tidal_country_code", ""
|
||||
)
|
||||
self.streamrip_config.session.tidal.quality = int(
|
||||
os.getenv("tidal_default_quality", 2)
|
||||
)
|
||||
self._load_tidal_config()
|
||||
self.streamrip_config.session.conversion.enabled = False
|
||||
self.streamrip_config.session.downloads.folder = os.getenv(
|
||||
"tidal_download_folder", ""
|
||||
)
|
||||
self.streamrip_config
|
||||
self.streamrip_client = TidalClient(self.streamrip_config)
|
||||
self.MAX_CONCURRENT_METADATA_REQUESTS = 2
|
||||
self.METADATA_RATE_LIMIT = 1.25
|
||||
@@ -82,19 +88,328 @@ class SRUtil:
|
||||
self.on_rate_limit: Optional[Callable[[Exception], Any]] = None
|
||||
# Internal flag to avoid repeated notifications for the same runtime
|
||||
self._rate_limit_notified = False
|
||||
# Track when we last successfully logged in
|
||||
self._last_login_time: Optional[float] = None
|
||||
# Track last successful API call
|
||||
self._last_successful_request: Optional[float] = None
|
||||
# Keepalive task handle
|
||||
self._keepalive_task: Optional[asyncio.Task] = None
|
||||
# Keepalive interval in seconds
|
||||
self.KEEPALIVE_INTERVAL = 180 # 3 minutes
|
||||
|
||||
async def start_keepalive(self) -> None:
|
||||
"""Start the background keepalive task.
|
||||
|
||||
This should be called once at startup to ensure the Tidal session
|
||||
stays alive even during idle periods.
|
||||
"""
|
||||
if self._keepalive_task and not self._keepalive_task.done():
|
||||
logging.info("Tidal keepalive task already running")
|
||||
return
|
||||
|
||||
# Ensure initial login
|
||||
try:
|
||||
await self._login_and_persist()
|
||||
logging.info("Initial Tidal login successful")
|
||||
except Exception as e:
|
||||
logging.warning("Initial Tidal login failed: %s", e)
|
||||
|
||||
self._keepalive_task = asyncio.create_task(self._keepalive_runner())
|
||||
logging.info("Tidal keepalive task started")
|
||||
|
||||
async def stop_keepalive(self) -> None:
|
||||
"""Stop the background keepalive task."""
|
||||
if self._keepalive_task and not self._keepalive_task.done():
|
||||
self._keepalive_task.cancel()
|
||||
try:
|
||||
await self._keepalive_task
|
||||
except asyncio.CancelledError:
|
||||
pass
|
||||
logging.info("Tidal keepalive task stopped")
|
||||
|
||||
async def _keepalive_runner(self) -> None:
|
||||
"""Background task to keep the Tidal session alive."""
|
||||
while True:
|
||||
try:
|
||||
await asyncio.sleep(self.KEEPALIVE_INTERVAL)
|
||||
|
||||
# Check if we've had recent activity
|
||||
if self._last_successful_request:
|
||||
time_since_last = time.time() - self._last_successful_request
|
||||
if time_since_last < self.KEEPALIVE_INTERVAL:
|
||||
# Recent activity, no need to ping
|
||||
continue
|
||||
|
||||
# Check if token is expiring soon and proactively refresh
|
||||
if self._is_token_expiring_soon():
|
||||
logging.info("Tidal keepalive: Token expiring soon, refreshing...")
|
||||
try:
|
||||
await self._login_and_persist(force=True)
|
||||
logging.info("Tidal keepalive: Token refresh successful")
|
||||
except Exception as e:
|
||||
logging.warning("Tidal keepalive: Token refresh failed: %s", e)
|
||||
continue
|
||||
|
||||
# Check if session is stale
|
||||
if self._is_session_stale():
|
||||
logging.info("Tidal keepalive: Session stale, refreshing...")
|
||||
try:
|
||||
await self._login_and_persist(force=True)
|
||||
logging.info("Tidal keepalive: Session refresh successful")
|
||||
except Exception as e:
|
||||
logging.warning("Tidal keepalive: Session refresh failed: %s", e)
|
||||
continue
|
||||
|
||||
# Make a lightweight API call to keep the session alive
|
||||
if self.streamrip_client.logged_in:
|
||||
try:
|
||||
# Simple search to keep the connection alive
|
||||
await self._safe_api_call(
|
||||
self.streamrip_client.search,
|
||||
media_type="artist",
|
||||
query="test",
|
||||
retries=1,
|
||||
)
|
||||
logging.debug("Tidal keepalive ping successful")
|
||||
except Exception as e:
|
||||
logging.warning("Tidal keepalive ping failed: %s", e)
|
||||
# Try to refresh the session
|
||||
try:
|
||||
await self._login_and_persist(force=True)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
except asyncio.CancelledError:
|
||||
logging.info("Tidal keepalive task cancelled")
|
||||
break
|
||||
except Exception as e:
|
||||
logging.error("Error in Tidal keepalive task: %s", e)
|
||||
|
||||
def _load_tidal_config(self) -> None:
|
||||
"""Load Tidal config from cache file if available, otherwise from env."""
|
||||
tidal = self.streamrip_config.session.tidal
|
||||
cached = self._load_cached_tokens()
|
||||
|
||||
if cached:
|
||||
tidal.user_id = cached.get("user_id", "")
|
||||
tidal.access_token = cached.get("access_token", "")
|
||||
tidal.refresh_token = cached.get("refresh_token", "")
|
||||
tidal.token_expiry = cached.get("token_expiry", "")
|
||||
tidal.country_code = cached.get("country_code", os.getenv("tidal_country_code", ""))
|
||||
else:
|
||||
tidal.user_id = os.getenv("tidal_user_id", "")
|
||||
tidal.access_token = os.getenv("tidal_access_token", "")
|
||||
tidal.refresh_token = os.getenv("tidal_refresh_token", "")
|
||||
tidal.token_expiry = os.getenv("tidal_token_expiry", "")
|
||||
tidal.country_code = os.getenv("tidal_country_code", "")
|
||||
|
||||
tidal.quality = int(os.getenv("tidal_default_quality", 2))
|
||||
|
||||
def _load_cached_tokens(self) -> Optional[dict]:
|
||||
"""Load cached tokens from disk if valid."""
|
||||
try:
|
||||
if TIDAL_TOKEN_CACHE_PATH.exists():
|
||||
with open(TIDAL_TOKEN_CACHE_PATH, "r") as f:
|
||||
data = json.load(f)
|
||||
# Validate required fields exist
|
||||
if all(k in data for k in ("access_token", "refresh_token", "token_expiry")):
|
||||
logging.info("Loaded Tidal tokens from cache")
|
||||
return data
|
||||
except Exception as e:
|
||||
logging.warning("Failed to load cached Tidal tokens: %s", e)
|
||||
return None
|
||||
|
||||
def _save_cached_tokens(self) -> None:
|
||||
"""Persist current tokens to disk for use across restarts."""
|
||||
try:
|
||||
tidal = self.streamrip_config.session.tidal
|
||||
data = {
|
||||
"user_id": tidal.user_id,
|
||||
"access_token": tidal.access_token,
|
||||
"refresh_token": tidal.refresh_token,
|
||||
"token_expiry": tidal.token_expiry,
|
||||
"country_code": tidal.country_code,
|
||||
}
|
||||
with open(TIDAL_TOKEN_CACHE_PATH, "w") as f:
|
||||
json.dump(data, f)
|
||||
logging.info("Saved Tidal tokens to cache")
|
||||
except Exception as e:
|
||||
logging.warning("Failed to save Tidal tokens: %s", e)
|
||||
|
||||
def _apply_new_tokens(self, auth_info: dict) -> None:
|
||||
"""Apply new tokens from device auth to config."""
|
||||
tidal = self.streamrip_config.session.tidal
|
||||
tidal.user_id = str(auth_info.get("user_id", ""))
|
||||
tidal.access_token = auth_info.get("access_token", "")
|
||||
tidal.refresh_token = auth_info.get("refresh_token", "")
|
||||
tidal.token_expiry = auth_info.get("token_expiry", "")
|
||||
tidal.country_code = auth_info.get("country_code", tidal.country_code)
|
||||
self._save_cached_tokens()
|
||||
|
||||
async def start_device_auth(self) -> tuple[str, str]:
|
||||
"""Start device authorization flow.
|
||||
|
||||
Returns:
|
||||
tuple: (device_code, verification_url) - User should visit the URL to authorize.
|
||||
"""
|
||||
if not hasattr(self.streamrip_client, 'session') or not self.streamrip_client.session:
|
||||
self.streamrip_client.session = await self.streamrip_client.get_session()
|
||||
|
||||
device_code, verification_url = await self.streamrip_client._get_device_code()
|
||||
return device_code, verification_url
|
||||
|
||||
async def check_device_auth(self, device_code: str) -> tuple[bool, Optional[str]]:
|
||||
"""Check if user has completed device authorization.
|
||||
|
||||
Args:
|
||||
device_code: The device code from start_device_auth()
|
||||
|
||||
Returns:
|
||||
tuple: (success, error_message)
|
||||
- (True, None) if auth completed successfully
|
||||
- (False, "pending") if user hasn't authorized yet
|
||||
- (False, error_message) if auth failed
|
||||
"""
|
||||
status, auth_info = await self.streamrip_client._get_auth_status(device_code)
|
||||
|
||||
if status == 0:
|
||||
# Success - apply new tokens
|
||||
self._apply_new_tokens(auth_info)
|
||||
# Re-login with new tokens
|
||||
self.streamrip_client.logged_in = False
|
||||
try:
|
||||
await self.streamrip_client.login()
|
||||
self._save_cached_tokens()
|
||||
return True, None
|
||||
except Exception as e:
|
||||
return False, f"Login after auth failed: {e}"
|
||||
elif status == 2:
|
||||
# Pending - user hasn't authorized yet
|
||||
return False, "pending"
|
||||
else:
|
||||
# Failed
|
||||
return False, "Authorization failed"
|
||||
|
||||
def _is_token_expiring_soon(self) -> bool:
|
||||
"""Check if the token is about to expire within the buffer window."""
|
||||
tidal = self.streamrip_config.session.tidal
|
||||
token_expiry = getattr(tidal, "token_expiry", None)
|
||||
if not token_expiry:
|
||||
return True # No expiry info means we should refresh
|
||||
try:
|
||||
# token_expiry is typically an ISO timestamp string
|
||||
if isinstance(token_expiry, str):
|
||||
from datetime import datetime
|
||||
expiry_dt = datetime.fromisoformat(token_expiry.replace('Z', '+00:00'))
|
||||
expiry_ts = expiry_dt.timestamp()
|
||||
else:
|
||||
expiry_ts = float(token_expiry)
|
||||
return expiry_ts < (time.time() + TIDAL_TOKEN_REFRESH_BUFFER)
|
||||
except Exception as e:
|
||||
logging.warning("Failed to parse token expiry '%s': %s", token_expiry, e)
|
||||
return True # Err on the side of refreshing
|
||||
|
||||
def _is_session_stale(self) -> bool:
|
||||
"""Check if the login session is too old and should be refreshed."""
|
||||
if not self._last_login_time:
|
||||
return True
|
||||
session_age = time.time() - self._last_login_time
|
||||
return session_age > TIDAL_SESSION_MAX_AGE
|
||||
|
||||
async def _force_fresh_login(self) -> bool:
|
||||
"""Force a complete fresh login, ignoring logged_in state.
|
||||
|
||||
Returns True if login succeeded, False otherwise.
|
||||
"""
|
||||
# Reset the logged_in flag to force a fresh login
|
||||
self.streamrip_client.logged_in = False
|
||||
|
||||
# Close existing session if present
|
||||
if hasattr(self.streamrip_client, 'session') and self.streamrip_client.session:
|
||||
try:
|
||||
if not self.streamrip_client.session.closed:
|
||||
await self.streamrip_client.session.close()
|
||||
except Exception as e:
|
||||
logging.warning("Error closing old session: %s", e)
|
||||
# Use object.__setattr__ to bypass type checking for session reset
|
||||
try:
|
||||
object.__setattr__(self.streamrip_client, 'session', None)
|
||||
except Exception:
|
||||
pass # Session will be recreated on next login
|
||||
|
||||
try:
|
||||
logging.info("Forcing fresh Tidal login...")
|
||||
await self.streamrip_client.login()
|
||||
self._last_login_time = time.time()
|
||||
self._save_cached_tokens()
|
||||
logging.info("Fresh Tidal login successful")
|
||||
return True
|
||||
except Exception as e:
|
||||
logging.warning("Forced Tidal login failed: %s - device re-auth may be required", e)
|
||||
return False
|
||||
|
||||
async def _login_and_persist(self, force: bool = False) -> None:
|
||||
"""Login to Tidal and persist any refreshed tokens.
|
||||
|
||||
Args:
|
||||
force: If True, force a fresh login even if already logged in.
|
||||
|
||||
This method now checks for:
|
||||
1. Token expiry - refreshes if token is about to expire
|
||||
2. Session age - refreshes if session is too old
|
||||
3. logged_in state - logs in if not logged in
|
||||
|
||||
If refresh fails, logs a warning but does not raise.
|
||||
"""
|
||||
needs_login = force or not self.streamrip_client.logged_in
|
||||
|
||||
# Check if token is expiring soon
|
||||
if not needs_login and self._is_token_expiring_soon():
|
||||
logging.info("Tidal token expiring soon, will refresh")
|
||||
needs_login = True
|
||||
|
||||
# Check if session is too old
|
||||
if not needs_login and self._is_session_stale():
|
||||
logging.info("Tidal session is stale, will refresh")
|
||||
needs_login = True
|
||||
|
||||
if not needs_login:
|
||||
return
|
||||
|
||||
try:
|
||||
# Reset logged_in to ensure fresh login attempt
|
||||
if force or self._is_token_expiring_soon():
|
||||
self.streamrip_client.logged_in = False
|
||||
|
||||
await self.streamrip_client.login()
|
||||
self._last_login_time = time.time()
|
||||
# After login, tokens may have been refreshed - persist them
|
||||
self._save_cached_tokens()
|
||||
logging.info("Tidal login/refresh successful")
|
||||
except Exception as e:
|
||||
logging.warning("Tidal login/refresh failed: %s - device re-auth may be required", e)
|
||||
# Don't mark as logged in on failure - let subsequent calls retry
|
||||
|
||||
async def rate_limited_request(self, func, *args, **kwargs):
|
||||
"""Rate-limited wrapper that also ensures login before making requests."""
|
||||
async with self.METADATA_SEMAPHORE:
|
||||
now = time.time()
|
||||
elapsed = now - self.LAST_METADATA_REQUEST
|
||||
if elapsed < self.METADATA_RATE_LIMIT:
|
||||
await asyncio.sleep(self.METADATA_RATE_LIMIT - elapsed)
|
||||
|
||||
# Ensure we're logged in before making the request
|
||||
try:
|
||||
await self._login_and_persist()
|
||||
except Exception as e:
|
||||
logging.warning("Pre-request login failed in rate_limited_request: %s", e)
|
||||
|
||||
result = await func(*args, **kwargs)
|
||||
self.LAST_METADATA_REQUEST = time.time()
|
||||
return result
|
||||
|
||||
async def _safe_api_call(
|
||||
self, func, *args, retries: int = 2, backoff: float = 0.5, **kwargs
|
||||
self, func, *args, retries: int = 3, backoff: float = 0.5, **kwargs
|
||||
):
|
||||
"""Call an async API function with resilient retry behavior.
|
||||
|
||||
@@ -103,18 +418,32 @@ class SRUtil:
|
||||
attempt a `login()` and retry up to `retries` times.
|
||||
- On 400/429 responses (message contains '400' or '429'): retry with backoff
|
||||
without triggering login (to avoid excessive logins).
|
||||
- On 401 (Unauthorized): force a fresh login and retry.
|
||||
|
||||
Returns the result or raises the last exception.
|
||||
"""
|
||||
last_exc: Optional[Exception] = None
|
||||
for attempt in range(retries):
|
||||
try:
|
||||
return await func(*args, **kwargs)
|
||||
# Before each attempt, ensure we have a valid session
|
||||
if attempt == 0:
|
||||
# On first attempt, try to ensure logged in (checks token expiry)
|
||||
# Wrapped in try/except so login failures don't block the API call
|
||||
try:
|
||||
await self._login_and_persist()
|
||||
except Exception as login_err:
|
||||
logging.warning("Pre-request login failed: %s (continuing anyway)", login_err)
|
||||
|
||||
result = await func(*args, **kwargs)
|
||||
# Track successful request
|
||||
self._last_successful_request = time.time()
|
||||
return result
|
||||
except AttributeError as e:
|
||||
# Probably missing/closed client internals: try re-login once
|
||||
last_exc = e
|
||||
logging.warning("AttributeError in API call (attempt %d/%d): %s", attempt + 1, retries, e)
|
||||
try:
|
||||
await self.streamrip_client.login()
|
||||
await self._force_fresh_login()
|
||||
except Exception:
|
||||
pass
|
||||
continue
|
||||
@@ -144,6 +473,31 @@ class SRUtil:
|
||||
await asyncio.sleep(backoff * (2**attempt))
|
||||
continue
|
||||
|
||||
# Treat 401 (Unauthorized) as an auth failure: force a fresh re-login then retry
|
||||
is_401_error = (
|
||||
(isinstance(e, aiohttp.ClientResponseError) and getattr(e, "status", None) == 401)
|
||||
or "401" in msg
|
||||
or "unauthorized" in msg.lower()
|
||||
)
|
||||
if is_401_error:
|
||||
logging.warning(
|
||||
"Received 401/Unauthorized from Tidal (attempt %d/%d). Forcing fresh re-login...",
|
||||
attempt + 1,
|
||||
retries,
|
||||
)
|
||||
try:
|
||||
# Use force=True to ensure we actually re-authenticate
|
||||
login_success = await self._force_fresh_login()
|
||||
if login_success:
|
||||
logging.info("Forced re-login after 401 successful")
|
||||
else:
|
||||
logging.warning("Forced re-login after 401 failed - may need device re-auth")
|
||||
except Exception as login_exc:
|
||||
logging.warning("Forced login after 401 failed: %s", login_exc)
|
||||
if attempt < retries - 1:
|
||||
await asyncio.sleep(backoff * (2**attempt))
|
||||
continue
|
||||
|
||||
# Connection related errors — try to re-login then retry
|
||||
if (
|
||||
isinstance(
|
||||
@@ -159,7 +513,7 @@ class SRUtil:
|
||||
or "closed" in msg.lower()
|
||||
):
|
||||
try:
|
||||
await self.streamrip_client.login()
|
||||
await self._login_and_persist(force=True)
|
||||
except Exception:
|
||||
pass
|
||||
if attempt < retries - 1:
|
||||
@@ -434,8 +788,6 @@ class SRUtil:
|
||||
|
||||
async def get_albums_by_artist_id(self, artist_id: int) -> Optional[list | dict]:
|
||||
"""Get albums by artist ID. Retry login only on authentication failure. Rate limit and retry on 400/429."""
|
||||
import asyncio
|
||||
|
||||
artist_id_str: str = str(artist_id)
|
||||
albums_out: list[dict] = []
|
||||
max_retries = 4
|
||||
@@ -585,26 +937,26 @@ class SRUtil:
|
||||
TODO: Reimplement using StreamRip
|
||||
"""
|
||||
try:
|
||||
# _safe_api_call already handles login, no need to call it here
|
||||
search_res = await self._safe_api_call(
|
||||
self.streamrip_client.search,
|
||||
media_type="track",
|
||||
query=f"{artist} - {song}",
|
||||
retries=3,
|
||||
)
|
||||
logging.critical("Result: %s", search_res)
|
||||
logging.debug("Search result: %s", search_res)
|
||||
return (
|
||||
search_res[0].get("items")
|
||||
if search_res and isinstance(search_res, list)
|
||||
else []
|
||||
)
|
||||
except Exception as e:
|
||||
traceback.print_exc()
|
||||
logging.critical("Search Exception: %s", str(e))
|
||||
if n < 3:
|
||||
logging.warning("Search Exception: %s", str(e))
|
||||
if n < 2: # Reduce max retries from 3 to 2
|
||||
n += 1
|
||||
await asyncio.sleep(0.5 * n) # Add backoff
|
||||
return await self.get_tracks_by_artist_song(artist, song, n)
|
||||
return []
|
||||
# return []
|
||||
|
||||
async def get_stream_url_by_track_id(
|
||||
self, track_id: int, quality: str = "FLAC"
|
||||
@@ -655,7 +1007,6 @@ class SRUtil:
|
||||
"""
|
||||
for attempt in range(1, self.MAX_METADATA_RETRIES + 1):
|
||||
try:
|
||||
await self._safe_api_call(self.streamrip_client.login, retries=1)
|
||||
# Track metadata
|
||||
metadata = await self.rate_limited_request(
|
||||
self.streamrip_client.get_metadata, str(track_id), "track"
|
||||
@@ -734,7 +1085,6 @@ class SRUtil:
|
||||
bool
|
||||
"""
|
||||
try:
|
||||
await self._safe_api_call(self.streamrip_client.login, retries=1)
|
||||
track_url = await self.get_stream_url_by_track_id(track_id)
|
||||
if not track_url:
|
||||
return False
|
||||
|
||||
25
utils/yt_utils.py
Normal file
25
utils/yt_utils.py
Normal file
@@ -0,0 +1,25 @@
|
||||
from typing import Optional
|
||||
import hmac
|
||||
import hashlib
|
||||
import time
|
||||
import base64
|
||||
import os
|
||||
|
||||
VIDEO_PROXY_SECRET = os.environ.get("VIDEO_PROXY_SECRET", "").encode()
|
||||
|
||||
def sign_video_id(video_id: Optional[str|bool]) -> str:
|
||||
"""Generate a signed token for a video ID."""
|
||||
if not VIDEO_PROXY_SECRET or not video_id:
|
||||
return "" # Return empty if no secret configured
|
||||
|
||||
timestamp = int(time.time() * 1000) # milliseconds to match JS Date.now()
|
||||
payload = f"{video_id}:{timestamp}"
|
||||
signature = hmac.new(
|
||||
VIDEO_PROXY_SECRET,
|
||||
payload.encode(),
|
||||
hashlib.sha256
|
||||
).hexdigest()
|
||||
|
||||
token_data = f"{payload}:{signature}"
|
||||
# base64url encode (no padding, to match JS base64url)
|
||||
return base64.urlsafe_b64encode(token_data.encode()).decode().rstrip("=")
|
||||
Reference in New Issue
Block a user