misc / RQ bulk downloads for TRip

This commit is contained in:
2025-08-15 13:31:15 -04:00
parent 72a7734152
commit 93050ec6cf
6 changed files with 368 additions and 21 deletions

View File

@@ -41,16 +41,16 @@ class RandMsg(FastAPI):
db_rand_selected: int = 9 db_rand_selected: int = 9
db_rand_selected = random.choice([3]) db_rand_selected = random.choice([3])
title_attr: str = "Unknown" title_attr: str = "Unknown"
randmsg_db_path: Optional[Union[str, LiteralString]] = None
db_query: Optional[str] = None
match db_rand_selected: match db_rand_selected:
case 0: case 0:
randmsg_db_path: Union[str, LiteralString] = os.path.join( randmsg_db_path = os.path.join(
"/usr/local/share", "sqlite_dbs", "qajoke.db" "/usr/local/share", "sqlite_dbs", "qajoke.db"
) # For qajoke db ) # For qajoke db
db_query: str = ( db_query = "SELECT id, ('<b>Q:</b> ' || question || '<br/><b>A:</b> ' \
"SELECT id, ('<b>Q:</b> ' || question || '<br/><b>A:</b> ' \
|| answer) FROM jokes ORDER BY RANDOM() LIMIT 1" # For qajoke db || answer) FROM jokes ORDER BY RANDOM() LIMIT 1" # For qajoke db
)
title_attr = "QA Joke DB" title_attr = "QA Joke DB"
case 1 | 9: case 1 | 9:
randmsg_db_path = os.path.join( randmsg_db_path = os.path.join(
@@ -90,9 +90,20 @@ class RandMsg(FastAPI):
WHERE score >= 10000 ORDER BY RANDOM() LIMIT 1""" WHERE score >= 10000 ORDER BY RANDOM() LIMIT 1"""
title_attr = "r/jokes DB" title_attr = "r/jokes DB"
if not randmsg_db_path:
return JSONResponse(
content={
"err": True,
}
)
async with sqlite3.connect(database=randmsg_db_path, timeout=1) as _db: async with sqlite3.connect(database=randmsg_db_path, timeout=1) as _db:
async with await _db.execute(db_query) as _cursor: async with await _db.execute(db_query) as _cursor:
result: sqlite3.Row = await _cursor.fetchone() if not isinstance(_cursor, sqlite3.Cursor):
return JSONResponse(content={"err": True})
result: Optional[sqlite3.Row] = await _cursor.fetchone()
if not result:
return JSONResponse(content={"err": True})
(result_id, result_msg) = result (result_id, result_msg) = result
result_msg = result_msg.strip() result_msg = result_msg.strip()
return JSONResponse( return JSONResponse(

View File

@@ -4,6 +4,15 @@ from fastapi_throttle import RateLimiter
from fastapi.responses import JSONResponse from fastapi.responses import JSONResponse
from utils.sr_wrapper import SRUtil from utils.sr_wrapper import SRUtil
from auth.deps import get_current_user from auth.deps import get_current_user
from redis import Redis
from rq import Queue
from rq.job import Job
from utils.rip_background import bulk_download
from lyric_search.sources import private
from pydantic import BaseModel
class ValidBulkFetchRequest(BaseModel):
track_ids: list[int]
class RIP(FastAPI): class RIP(FastAPI):
@@ -16,12 +25,27 @@ class RIP(FastAPI):
self.util = my_util self.util = my_util
self.trip_util = SRUtil() self.trip_util = SRUtil()
self.constants = constants self.constants = constants
self.redis_conn = Redis(
host="localhost",
port=6379,
db=0,
password=private.REDIS_PW,
)
self.task_queue = Queue(
"dls",
connection=self.redis_conn,
default_result_ttl=86400,
default_failure_ttl=86400,
)
self.endpoints: dict = { self.endpoints: dict = {
"trip/get_artists_by_name": self.artists_by_name_handler, "trip/get_artists_by_name": self.artists_by_name_handler,
"trip/get_albums_by_artist_id/{artist_id:path}": self.albums_by_artist_id_handler, "trip/get_albums_by_artist_id/{artist_id:path}": self.albums_by_artist_id_handler,
"trip/get_tracks_by_artist_song": self.tracks_by_artist_song_handler, "trip/get_tracks_by_artist_song": self.tracks_by_artist_song_handler,
"trip/get_tracks_by_album_id/{album_id:path}": self.tracks_by_album_id_handler, "trip/get_tracks_by_album_id/{album_id:path}": self.tracks_by_album_id_handler,
"trip/get_track_by_id/{track_id:path}": self.track_by_id_handler, "trip/get_track_by_id/{track_id:path}": self.track_by_id_handler,
"trip/bulk_fetch": self.bulk_fetch_handler,
"trip/job/{job_id:path}": self.job_status_handler,
"trip/jobs/list": self.job_list_handler,
} }
for endpoint, handler in self.endpoints.items(): for endpoint, handler in self.endpoints.items():
@@ -29,7 +53,7 @@ class RIP(FastAPI):
app.add_api_route( app.add_api_route(
f"/{endpoint}", f"/{endpoint}",
handler, handler,
methods=["GET"], methods=["GET"] if endpoint != "trip/bulk_fetch" else ["POST"],
include_in_schema=True, include_in_schema=True,
dependencies=dependencies, dependencies=dependencies,
) )
@@ -79,3 +103,87 @@ class RIP(FastAPI):
if not track: if not track:
return Response(status_code=404, content="Not found") return Response(status_code=404, content="Not found")
return JSONResponse(content={"stream_url": track}) return JSONResponse(content={"stream_url": track})
async def bulk_fetch_handler(
self,
data: ValidBulkFetchRequest,
request: Request,
user=Depends(get_current_user),
) -> Response:
"""Bulk fetch a list of track IDs"""
if not data or not data.track_ids:
return JSONResponse(
content={
"err": True,
"errorText": "Invalid data",
}
)
track_ids = data.track_ids
job = self.task_queue.enqueue(bulk_download, track_ids)
self.redis_conn.lpush("enqueued_job_ids", job.id)
return JSONResponse(
content={
"job_id": job.id,
"status": "queued",
}
)
async def job_status_handler(
self, job_id: str, request: Request, user=Depends(get_current_user)
):
"""Get status and result of a single job"""
try:
job = Job.fetch(job_id, connection=self.redis_conn)
except Exception:
return JSONResponse({"error": "Job not found"}, status_code=404)
return {
"id": job.id,
"status": job.get_status(),
"result": job.result,
"enqueued_at": job.enqueued_at,
"started_at": job.started_at,
"ended_at": job.ended_at,
}
async def job_list_handler(self, request: Request, user=Depends(get_current_user)):
"""List all jobs in the queue (queued + finished, if result_ttl allows)"""
jobs_info = []
# 1⃣ Jobs still in the queue (pending)
for job in self.task_queue.jobs:
jobs_info.append(
{
"id": job.id,
"status": job.get_status(), # queued
"result": job.result,
"enqueued_at": job.enqueued_at,
"progress": job.meta.get("progress", None),
}
)
# 2⃣ Started/running jobs tracked via enqueued_job_ids
job_ids = self.redis_conn.lrange("enqueued_job_ids", 0, -1)
for jid_bytes in job_ids: # type: ignore
jid = jid_bytes.decode()
try:
job = Job.fetch(jid, connection=self.redis_conn)
except Exception:
continue # job may have completed and expired
status = job.get_status()
if status in ("started", "queued", "finished"):
# avoid duplicates for jobs already in task_queue.jobs
if not any(j["id"] == job.id for j in jobs_info):
jobs_info.append(
{
"id": job.id,
"status": status,
"result": job.result,
"enqueued_at": job.enqueued_at,
"progress": job.meta.get("progress", None),
"tracks": job.meta.get("track_list", None),
}
)
return {"jobs": jobs_info}

View File

@@ -51,7 +51,7 @@ class RadioUtil:
'main': self.constants.RADIO_DB_QUERY, 'main': self.constants.RADIO_DB_QUERY,
'rap': self.constants.RADIO_DB_QUERY_RAP, 'rap': self.constants.RADIO_DB_QUERY_RAP,
'pop': self.constants.RADIO_DB_QUERY_POP, 'pop': self.constants.RADIO_DB_QUERY_POP,
'classical': self.constants.RADIO_DB_QUERY_CLASSICAL, # 'classical': self.constants.RADIO_DB_QUERY_CLASSICAL,
'rock': self.constants.RADIO_DB_QUERY_ROCK, 'rock': self.constants.RADIO_DB_QUERY_ROCK,
'electronic': self.constants.RADIO_DB_QUERY_ELECTRONIC, 'electronic': self.constants.RADIO_DB_QUERY_ELECTRONIC,
} }
@@ -77,7 +77,6 @@ class RadioUtil:
"rock", "rock",
"rap", "rap",
"electronic", "electronic",
"classical",
"pop", "pop",
] ]
self.active_playlist: dict[str, list[dict]] = {} self.active_playlist: dict[str, list[dict]] = {}
@@ -152,10 +151,10 @@ class RadioUtil:
filter = filter.strip().lower() filter = filter.strip().lower()
matched: list[dict] = [] matched: list[dict] = []
for item in self.active_playlist[station]: for item in self.active_playlist[station]:
artist: str = item.get("artist", None) artist: str = item.get("artist", "")
song: str = item.get("song", None) song: str = item.get("song", "")
artistsong: str = item.get("artistsong", None) artistsong: str = item.get("artistsong", "")
album: str = item.get("album", None) album: str = item.get("album", "")
if not artist or not song or not artistsong: if not artist or not song or not artistsong:
continue continue
if non_alnum.sub("", filter) in non_alnum.sub("", artistsong).lower(): if non_alnum.sub("", filter) in non_alnum.sub("", artistsong).lower():
@@ -201,6 +200,8 @@ class RadioUtil:
search_song = song search_song = song
if not artistsong: if not artistsong:
artistsong = f"{search_artist} - {search_song}" artistsong = f"{search_artist} - {search_song}"
if not search_artist or not search_song or not artistsong:
raise RadioException("No query provided")
search_params = ( search_params = (
search_artist.lower(), search_artist.lower(),
search_song.lower(), search_song.lower(),
@@ -280,6 +281,8 @@ class RadioUtil:
""" """
try: try:
added_rows: int = 0 added_rows: int = 0
artist = None
genre = None
with sqlite3.connect(self.artist_genre_db_path, timeout=2) as _db: with sqlite3.connect(self.artist_genre_db_path, timeout=2) as _db:
for pair in pairs: for pair in pairs:
try: try:
@@ -388,7 +391,7 @@ class RadioUtil:
for playlist in self.playlists: for playlist in self.playlists:
playlist_redis_key: str = f"playlist:{playlist}" playlist_redis_key: str = f"playlist:{playlist}"
_playlist = await self.redis_client.json().get(playlist_redis_key) _playlist = await self.redis_client.json().get(playlist_redis_key) # type: ignore
if playlist not in self.active_playlist.keys(): if playlist not in self.active_playlist.keys():
self.active_playlist[playlist] = [] self.active_playlist[playlist] = []
if not playlist == "rock": if not playlist == "rock":
@@ -418,7 +421,7 @@ class RadioUtil:
logging.info("Removing duplicate tracks...") logging.info("Removing duplicate tracks...")
dedupe_processed = [] dedupe_processed = []
for item in self.active_playlist[playlist]: for item in self.active_playlist[playlist]:
artistsongabc: str = non_alnum.sub("", item.get("artistsong", None)) artistsongabc: str = non_alnum.sub("", item.get("artistsong", ""))
if not artistsongabc: if not artistsongabc:
logging.info("Missing artistsong: %s", item) logging.info("Missing artistsong: %s", item)
continue continue

169
utils/rip_background.py Normal file
View File

@@ -0,0 +1,169 @@
import logging
import asyncio
import random
import os
import tarfile
import uuid
import shutil
from pathlib import Path
from urllib.parse import urlparse, unquote
import aiohttp
from rq import get_current_job
from utils.sr_wrapper import SRUtil
# Configure logging
logging.basicConfig(
level=logging.DEBUG,
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
)
# Constants
ROOT_DIR = Path("/storage/music2") # Change to your music folder
MAX_RETRIES = 3
THROTTLE_MIN = 0.2
THROTTLE_MAX = 1.5
HEADERS = {
"User-Agent": (
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
"AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/116.0.5845.97 Safari/537.36"
),
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8",
"Accept-Language": "en-US,en;q=0.9",
"Connection": "keep-alive",
}
# StreamRip utility
sr = SRUtil()
def bulk_download(track_list: list):
"""
Full RQ-compatible bulk download job with:
- async per-track URL fetching
- retry on failure
- per-track success/failure
- metadata extraction
- organized file storage
- throttling
- per-artist tarball creation
- progress updates
"""
job = get_current_job()
async def process_tracks():
per_track_meta = []
artist_files = {} # artist -> list of files
async with aiohttp.ClientSession(headers=HEADERS) as session:
total = len(track_list)
logging.critical("Total tracks to process: %s", total)
for i, track_id in enumerate(track_list):
track_info = {
"track_id": track_id,
"status": "pending",
"file_path": None,
"error": None,
}
attempt = 0
while attempt < MAX_RETRIES:
attempt += 1
try:
# 1⃣ Get track URL
url = await sr.get_stream_url_by_track_id(track_id)
if not url:
logging.critical(
"Failed to get URL for track: %s", track_id
)
await asyncio.sleep(
random.uniform(THROTTLE_MIN, THROTTLE_MAX)
)
continue
# 2⃣ Download file (chunked)
parsed = urlparse(url)
ext = Path(unquote(parsed.path)).suffix or ".mp3"
tmp_file = Path(f"/tmp/{track_id}{ext}")
async with session.get(url) as resp:
resp.raise_for_status()
with open(tmp_file, "wb") as f:
async for chunk in resp.content.iter_chunked(64 * 1024):
f.write(chunk)
# 3⃣ Extract metadata
metadata = await sr.get_metadata_by_track_id(track_id)
if not metadata:
logging.critical(
"Failed to retrieve metadata for track ID: %s. Skipping",
track_id,
)
continue
artist = metadata.get("artist", "Unknown Artist")
album = metadata.get("album", "Unknown Album")
title = metadata.get("song", "Unknown Song")
logging.critical("Got metadata: %s/%s/%s", artist, album, title)
# 4⃣ Organize path
final_dir = ROOT_DIR / artist / album
final_dir.mkdir(parents=True, exist_ok=True)
final_file = final_dir / f"{title}{ext}"
tmp_file.rename(final_file)
# 5⃣ Track per-track info
track_info.update(
{"status": "success", "file_path": str(final_file)}
)
artist_files.setdefault(artist, []).append(final_file)
break # success
except Exception as e:
logging.error("Error downloading track %s: %s", track_id, e)
track_info["error"] = str(e)
if attempt >= MAX_RETRIES:
track_info["status"] = "failed"
else:
# small delay before retry
await asyncio.sleep(
random.uniform(THROTTLE_MIN, THROTTLE_MAX)
)
# 6⃣ Update RQ job meta
per_track_meta.append(track_info)
if job:
job.meta["progress"] = int((i + 1) / total * 100)
job.meta["tracks"] = per_track_meta
job.save_meta()
# 7⃣ Throttle between downloads
await asyncio.sleep(random.uniform(THROTTLE_MIN, THROTTLE_MAX))
# 8⃣ Create per-artist tarballs
tarballs = []
for artist, files in artist_files.items():
short_id = uuid.uuid4().hex[:8]
tarball_name = ROOT_DIR / "completed" / f"{artist}_{short_id}.tar.gz"
with tarfile.open(tarball_name, "w:gz") as tar:
for f in files:
tar.add(f, arcname=f.name)
os.remove(f) # remove original file
logging.critical("Created tarball: %s", tarball_name)
tarballs.append(str(tarball_name))
artist_dir = ROOT_DIR / artist
shutil.rmtree(artist_dir, ignore_errors=True)
return tarballs
# Run the async function synchronously
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
return loop.run_until_complete(process_tracks())
finally:
loop.close()

View File

@@ -1,10 +1,16 @@
from typing import Optional from typing import Optional
from uuid import uuid4
from urllib.parse import urlparse
import hashlib
import logging import logging
import os import os
import asyncio import asyncio
from streamrip.client import TidalClient import aiohttp
from streamrip.config import Config as StreamripConfig from streamrip.client import TidalClient # type: ignore
from streamrip.config import Config as StreamripConfig # type: ignore
from dotenv import load_dotenv from dotenv import load_dotenv
load_dotenv() load_dotenv()
@@ -180,8 +186,7 @@ class SRUtil:
Optional[str]: The stream URL or None if not found. Optional[str]: The stream URL or None if not found.
""" """
if quality not in ["LOSSLESS", "HIGH", "LOW"]: if quality not in ["LOSSLESS", "HIGH", "LOW"]:
logging.error("Invalid quality requested: %s", logging.error("Invalid quality requested: %s", quality)
quality)
quality_int: int = int(self.streamrip_config.session.tidal.quality) quality_int: int = int(self.streamrip_config.session.tidal.quality)
match quality: match quality:
case "HIGH": case "HIGH":
@@ -196,12 +201,12 @@ class SRUtil:
try: try:
track = await self.streamrip_client.get_downloadable( track = await self.streamrip_client.get_downloadable(
track_id=track_id_str, quality=quality_int track_id=track_id_str, quality=quality_int
) )
except AttributeError: except AttributeError:
await self.streamrip_client.login() await self.streamrip_client.login()
track = await self.streamrip_client.get_downloadable( track = await self.streamrip_client.get_downloadable(
track_id=track_id_str, quality=quality_int track_id=track_id_str, quality=quality_int
) )
if not track: if not track:
logging.warning("No track found for ID: %s", track_id) logging.warning("No track found for ID: %s", track_id)
return None return None
@@ -210,3 +215,54 @@ class SRUtil:
logging.warning("No stream URL found for track ID: %s", track_id) logging.warning("No stream URL found for track ID: %s", track_id)
return None return None
return stream_url return stream_url
async def get_metadata_by_track_id(self, track_id: int) -> Optional[dict]:
if not self.streamrip_client.logged_in:
await self.streamrip_client.login()
try:
metadata = await self.streamrip_client.get_metadata(str(track_id), "track")
return {
"artist": metadata.get("artist", {}).get("name", "Unknown Artist"),
"album": metadata.get("album", {}).get("title", "Unknown Album"),
"song": metadata.get("title", uuid4()),
}
except Exception as e:
logging.critical(
"Get metadata for %s failed, Exception: %s", track_id, str(e)
)
return None
async def download(self, track_id: int, quality: str = "LOSSLESS") -> bool | str:
"""Download track
Args:
track_id (int)
quality (str): The quality of the stream, default is "LOSSLESS". Other options: HIGH, LOW
Returns:
bool
"""
if not self.streamrip_client.logged_in:
await self.streamrip_client.login()
try:
track_url = await self.get_stream_url_by_track_id(track_id)
if not track_url:
return False
parsed_url = urlparse(track_url)
parsed_url_filename = os.path.basename(parsed_url.path)
parsed_url_ext = os.path.splitext(parsed_url_filename)[1]
unique = hashlib.sha256(uuid4().bytes).hexdigest()[:16]
dl_folder_path = (
f"{self.streamrip_config.session.downloads.folder}/{unique}"
)
dl_path = f"{dl_folder_path}/{track_id}.{parsed_url_ext}"
async with aiohttp.ClientSession() as session:
async with session.get(
track_url, headers={}, timeout=aiohttp.ClientTimeout(total=60)
) as resp:
resp.raise_for_status()
with open(dl_path, "wb") as f:
async for chunk in resp.content.iter_chunked(1024 * 64):
f.write(chunk)
return dl_path
except Exception as e:
logging.critical("Error: %s", str(e))
return False