diff --git a/base.py b/base.py index 80a0086..a5e5cea 100644 --- a/base.py +++ b/base.py @@ -10,7 +10,7 @@ from fastapi.middleware.cors import CORSMiddleware from lyric_search.sources import redis_cache logging.basicConfig(level=logging.INFO) -logging.getLogger("aiosqlite").setLevel(logging.WARNING) +logging.getLogger("aiosqlite").setLevel(logging.WARNING) logging.getLogger("httpx").setLevel(logging.WARNING) logger = logging.getLogger() diff --git a/endpoints/rand_msg.py b/endpoints/rand_msg.py index aa4eb03..4be299a 100644 --- a/endpoints/rand_msg.py +++ b/endpoints/rand_msg.py @@ -41,16 +41,16 @@ class RandMsg(FastAPI): db_rand_selected: int = 9 db_rand_selected = random.choice([3]) title_attr: str = "Unknown" + randmsg_db_path: Optional[Union[str, LiteralString]] = None + db_query: Optional[str] = None match db_rand_selected: case 0: - randmsg_db_path: Union[str, LiteralString] = os.path.join( + randmsg_db_path = os.path.join( "/usr/local/share", "sqlite_dbs", "qajoke.db" ) # For qajoke db - db_query: str = ( - "SELECT id, ('Q: ' || question || '
A: ' \ + db_query = "SELECT id, ('Q: ' || question || '
A: ' \ || answer) FROM jokes ORDER BY RANDOM() LIMIT 1" # For qajoke db - ) title_attr = "QA Joke DB" case 1 | 9: randmsg_db_path = os.path.join( @@ -90,9 +90,20 @@ class RandMsg(FastAPI): WHERE score >= 10000 ORDER BY RANDOM() LIMIT 1""" title_attr = "r/jokes DB" + if not randmsg_db_path: + return JSONResponse( + content={ + "err": True, + } + ) + async with sqlite3.connect(database=randmsg_db_path, timeout=1) as _db: async with await _db.execute(db_query) as _cursor: - result: sqlite3.Row = await _cursor.fetchone() + if not isinstance(_cursor, sqlite3.Cursor): + return JSONResponse(content={"err": True}) + result: Optional[sqlite3.Row] = await _cursor.fetchone() + if not result: + return JSONResponse(content={"err": True}) (result_id, result_msg) = result result_msg = result_msg.strip() return JSONResponse( diff --git a/endpoints/rip.py b/endpoints/rip.py index 109d208..eed1d47 100644 --- a/endpoints/rip.py +++ b/endpoints/rip.py @@ -4,6 +4,15 @@ from fastapi_throttle import RateLimiter from fastapi.responses import JSONResponse from utils.sr_wrapper import SRUtil from auth.deps import get_current_user +from redis import Redis +from rq import Queue +from rq.job import Job +from utils.rip_background import bulk_download +from lyric_search.sources import private +from pydantic import BaseModel + +class ValidBulkFetchRequest(BaseModel): + track_ids: list[int] class RIP(FastAPI): @@ -16,12 +25,27 @@ class RIP(FastAPI): self.util = my_util self.trip_util = SRUtil() self.constants = constants + self.redis_conn = Redis( + host="localhost", + port=6379, + db=0, + password=private.REDIS_PW, + ) + self.task_queue = Queue( + "dls", + connection=self.redis_conn, + default_result_ttl=86400, + default_failure_ttl=86400, + ) self.endpoints: dict = { "trip/get_artists_by_name": self.artists_by_name_handler, "trip/get_albums_by_artist_id/{artist_id:path}": self.albums_by_artist_id_handler, "trip/get_tracks_by_artist_song": self.tracks_by_artist_song_handler, "trip/get_tracks_by_album_id/{album_id:path}": self.tracks_by_album_id_handler, "trip/get_track_by_id/{track_id:path}": self.track_by_id_handler, + "trip/bulk_fetch": self.bulk_fetch_handler, + "trip/job/{job_id:path}": self.job_status_handler, + "trip/jobs/list": self.job_list_handler, } for endpoint, handler in self.endpoints.items(): @@ -29,7 +53,7 @@ class RIP(FastAPI): app.add_api_route( f"/{endpoint}", handler, - methods=["GET"], + methods=["GET"] if endpoint != "trip/bulk_fetch" else ["POST"], include_in_schema=True, dependencies=dependencies, ) @@ -79,3 +103,87 @@ class RIP(FastAPI): if not track: return Response(status_code=404, content="Not found") return JSONResponse(content={"stream_url": track}) + + async def bulk_fetch_handler( + self, + data: ValidBulkFetchRequest, + request: Request, + user=Depends(get_current_user), + ) -> Response: + """Bulk fetch a list of track IDs""" + if not data or not data.track_ids: + return JSONResponse( + content={ + "err": True, + "errorText": "Invalid data", + } + ) + track_ids = data.track_ids + job = self.task_queue.enqueue(bulk_download, track_ids) + self.redis_conn.lpush("enqueued_job_ids", job.id) + return JSONResponse( + content={ + "job_id": job.id, + "status": "queued", + } + ) + + async def job_status_handler( + self, job_id: str, request: Request, user=Depends(get_current_user) + ): + """Get status and result of a single job""" + try: + job = Job.fetch(job_id, connection=self.redis_conn) + except Exception: + return JSONResponse({"error": "Job not found"}, status_code=404) + + return { + "id": job.id, + "status": job.get_status(), + "result": job.result, + "enqueued_at": job.enqueued_at, + "started_at": job.started_at, + "ended_at": job.ended_at, + } + + async def job_list_handler(self, request: Request, user=Depends(get_current_user)): + """List all jobs in the queue (queued + finished, if result_ttl allows)""" + jobs_info = [] + + # 1️⃣ Jobs still in the queue (pending) + for job in self.task_queue.jobs: + jobs_info.append( + { + "id": job.id, + "status": job.get_status(), # queued + "result": job.result, + "enqueued_at": job.enqueued_at, + "progress": job.meta.get("progress", None), + } + ) + + # 2️⃣ Started/running jobs tracked via enqueued_job_ids + job_ids = self.redis_conn.lrange("enqueued_job_ids", 0, -1) + for jid_bytes in job_ids: # type: ignore + jid = jid_bytes.decode() + try: + job = Job.fetch(jid, connection=self.redis_conn) + except Exception: + continue # job may have completed and expired + + status = job.get_status() + if status in ("started", "queued", "finished"): + # avoid duplicates for jobs already in task_queue.jobs + if not any(j["id"] == job.id for j in jobs_info): + jobs_info.append( + { + "id": job.id, + "status": status, + "result": job.result, + "enqueued_at": job.enqueued_at, + "progress": job.meta.get("progress", None), + "tracks": job.meta.get("track_list", None), + } + ) + + return {"jobs": jobs_info} diff --git a/utils/radio_util.py b/utils/radio_util.py index df45e15..1465372 100644 --- a/utils/radio_util.py +++ b/utils/radio_util.py @@ -51,7 +51,7 @@ class RadioUtil: 'main': self.constants.RADIO_DB_QUERY, 'rap': self.constants.RADIO_DB_QUERY_RAP, 'pop': self.constants.RADIO_DB_QUERY_POP, - 'classical': self.constants.RADIO_DB_QUERY_CLASSICAL, + # 'classical': self.constants.RADIO_DB_QUERY_CLASSICAL, 'rock': self.constants.RADIO_DB_QUERY_ROCK, 'electronic': self.constants.RADIO_DB_QUERY_ELECTRONIC, } @@ -77,7 +77,6 @@ class RadioUtil: "rock", "rap", "electronic", - "classical", "pop", ] self.active_playlist: dict[str, list[dict]] = {} @@ -152,10 +151,10 @@ class RadioUtil: filter = filter.strip().lower() matched: list[dict] = [] for item in self.active_playlist[station]: - artist: str = item.get("artist", None) - song: str = item.get("song", None) - artistsong: str = item.get("artistsong", None) - album: str = item.get("album", None) + artist: str = item.get("artist", "") + song: str = item.get("song", "") + artistsong: str = item.get("artistsong", "") + album: str = item.get("album", "") if not artist or not song or not artistsong: continue if non_alnum.sub("", filter) in non_alnum.sub("", artistsong).lower(): @@ -201,6 +200,8 @@ class RadioUtil: search_song = song if not artistsong: artistsong = f"{search_artist} - {search_song}" + if not search_artist or not search_song or not artistsong: + raise RadioException("No query provided") search_params = ( search_artist.lower(), search_song.lower(), @@ -280,6 +281,8 @@ class RadioUtil: """ try: added_rows: int = 0 + artist = None + genre = None with sqlite3.connect(self.artist_genre_db_path, timeout=2) as _db: for pair in pairs: try: @@ -388,7 +391,7 @@ class RadioUtil: for playlist in self.playlists: playlist_redis_key: str = f"playlist:{playlist}" - _playlist = await self.redis_client.json().get(playlist_redis_key) + _playlist = await self.redis_client.json().get(playlist_redis_key) # type: ignore if playlist not in self.active_playlist.keys(): self.active_playlist[playlist] = [] if not playlist == "rock": @@ -418,7 +421,7 @@ class RadioUtil: logging.info("Removing duplicate tracks...") dedupe_processed = [] for item in self.active_playlist[playlist]: - artistsongabc: str = non_alnum.sub("", item.get("artistsong", None)) + artistsongabc: str = non_alnum.sub("", item.get("artistsong", "")) if not artistsongabc: logging.info("Missing artistsong: %s", item) continue diff --git a/utils/rip_background.py b/utils/rip_background.py new file mode 100644 index 0000000..dd266c7 --- /dev/null +++ b/utils/rip_background.py @@ -0,0 +1,169 @@ +import logging +import asyncio +import random +import os +import tarfile +import uuid +import shutil +from pathlib import Path +from urllib.parse import urlparse, unquote + +import aiohttp +from rq import get_current_job +from utils.sr_wrapper import SRUtil + +# Configure logging +logging.basicConfig( + level=logging.DEBUG, + format="%(asctime)s [%(levelname)s] %(name)s: %(message)s", +) + +# Constants +ROOT_DIR = Path("/storage/music2") # Change to your music folder +MAX_RETRIES = 3 +THROTTLE_MIN = 0.2 +THROTTLE_MAX = 1.5 + +HEADERS = { + "User-Agent": ( + "Mozilla/5.0 (Windows NT 10.0; Win64; x64) " + "AppleWebKit/537.36 (KHTML, like Gecko) " + "Chrome/116.0.5845.97 Safari/537.36" + ), + "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8", + "Accept-Language": "en-US,en;q=0.9", + "Connection": "keep-alive", +} + +# StreamRip utility +sr = SRUtil() + + +def bulk_download(track_list: list): + """ + Full RQ-compatible bulk download job with: + - async per-track URL fetching + - retry on failure + - per-track success/failure + - metadata extraction + - organized file storage + - throttling + - per-artist tarball creation + - progress updates + """ + job = get_current_job() + + async def process_tracks(): + per_track_meta = [] + artist_files = {} # artist -> list of files + + async with aiohttp.ClientSession(headers=HEADERS) as session: + total = len(track_list) + logging.critical("Total tracks to process: %s", total) + + for i, track_id in enumerate(track_list): + track_info = { + "track_id": track_id, + "status": "pending", + "file_path": None, + "error": None, + } + attempt = 0 + + while attempt < MAX_RETRIES: + attempt += 1 + try: + # 1️⃣ Get track URL + url = await sr.get_stream_url_by_track_id(track_id) + if not url: + logging.critical( + "Failed to get URL for track: %s", track_id + ) + await asyncio.sleep( + random.uniform(THROTTLE_MIN, THROTTLE_MAX) + ) + continue + + # 2️⃣ Download file (chunked) + parsed = urlparse(url) + ext = Path(unquote(parsed.path)).suffix or ".mp3" + tmp_file = Path(f"/tmp/{track_id}{ext}") + + async with session.get(url) as resp: + resp.raise_for_status() + with open(tmp_file, "wb") as f: + async for chunk in resp.content.iter_chunked(64 * 1024): + f.write(chunk) + + # 3️⃣ Extract metadata + metadata = await sr.get_metadata_by_track_id(track_id) + if not metadata: + logging.critical( + "Failed to retrieve metadata for track ID: %s. Skipping", + track_id, + ) + continue + artist = metadata.get("artist", "Unknown Artist") + album = metadata.get("album", "Unknown Album") + title = metadata.get("song", "Unknown Song") + + logging.critical("Got metadata: %s/%s/%s", artist, album, title) + + # 4️⃣ Organize path + final_dir = ROOT_DIR / artist / album + final_dir.mkdir(parents=True, exist_ok=True) + final_file = final_dir / f"{title}{ext}" + tmp_file.rename(final_file) + + # 5️⃣ Track per-track info + track_info.update( + {"status": "success", "file_path": str(final_file)} + ) + artist_files.setdefault(artist, []).append(final_file) + + break # success + + except Exception as e: + logging.error("Error downloading track %s: %s", track_id, e) + track_info["error"] = str(e) + if attempt >= MAX_RETRIES: + track_info["status"] = "failed" + else: + # small delay before retry + await asyncio.sleep( + random.uniform(THROTTLE_MIN, THROTTLE_MAX) + ) + + # 6️⃣ Update RQ job meta + per_track_meta.append(track_info) + if job: + job.meta["progress"] = int((i + 1) / total * 100) + job.meta["tracks"] = per_track_meta + job.save_meta() + + # 7️⃣ Throttle between downloads + await asyncio.sleep(random.uniform(THROTTLE_MIN, THROTTLE_MAX)) + + # 8️⃣ Create per-artist tarballs + tarballs = [] + for artist, files in artist_files.items(): + short_id = uuid.uuid4().hex[:8] + tarball_name = ROOT_DIR / "completed" / f"{artist}_{short_id}.tar.gz" + with tarfile.open(tarball_name, "w:gz") as tar: + for f in files: + tar.add(f, arcname=f.name) + os.remove(f) # remove original file + logging.critical("Created tarball: %s", tarball_name) + tarballs.append(str(tarball_name)) + artist_dir = ROOT_DIR / artist + shutil.rmtree(artist_dir, ignore_errors=True) + + return tarballs + + # Run the async function synchronously + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + try: + return loop.run_until_complete(process_tracks()) + finally: + loop.close() diff --git a/utils/sr_wrapper.py b/utils/sr_wrapper.py index 4bf8529..fe4bb18 100644 --- a/utils/sr_wrapper.py +++ b/utils/sr_wrapper.py @@ -1,10 +1,16 @@ from typing import Optional +from uuid import uuid4 +from urllib.parse import urlparse +import hashlib import logging import os import asyncio -from streamrip.client import TidalClient -from streamrip.config import Config as StreamripConfig +import aiohttp +from streamrip.client import TidalClient # type: ignore +from streamrip.config import Config as StreamripConfig # type: ignore from dotenv import load_dotenv + + load_dotenv() @@ -180,8 +186,7 @@ class SRUtil: Optional[str]: The stream URL or None if not found. """ if quality not in ["LOSSLESS", "HIGH", "LOW"]: - logging.error("Invalid quality requested: %s", - quality) + logging.error("Invalid quality requested: %s", quality) quality_int: int = int(self.streamrip_config.session.tidal.quality) match quality: case "HIGH": @@ -196,12 +201,12 @@ class SRUtil: try: track = await self.streamrip_client.get_downloadable( track_id=track_id_str, quality=quality_int - ) + ) except AttributeError: await self.streamrip_client.login() track = await self.streamrip_client.get_downloadable( track_id=track_id_str, quality=quality_int - ) + ) if not track: logging.warning("No track found for ID: %s", track_id) return None @@ -210,3 +215,54 @@ class SRUtil: logging.warning("No stream URL found for track ID: %s", track_id) return None return stream_url + + async def get_metadata_by_track_id(self, track_id: int) -> Optional[dict]: + if not self.streamrip_client.logged_in: + await self.streamrip_client.login() + try: + metadata = await self.streamrip_client.get_metadata(str(track_id), "track") + return { + "artist": metadata.get("artist", {}).get("name", "Unknown Artist"), + "album": metadata.get("album", {}).get("title", "Unknown Album"), + "song": metadata.get("title", uuid4()), + } + except Exception as e: + logging.critical( + "Get metadata for %s failed, Exception: %s", track_id, str(e) + ) + return None + + async def download(self, track_id: int, quality: str = "LOSSLESS") -> bool | str: + """Download track + Args: + track_id (int) + quality (str): The quality of the stream, default is "LOSSLESS". Other options: HIGH, LOW + Returns: + bool + """ + if not self.streamrip_client.logged_in: + await self.streamrip_client.login() + try: + track_url = await self.get_stream_url_by_track_id(track_id) + if not track_url: + return False + parsed_url = urlparse(track_url) + parsed_url_filename = os.path.basename(parsed_url.path) + parsed_url_ext = os.path.splitext(parsed_url_filename)[1] + unique = hashlib.sha256(uuid4().bytes).hexdigest()[:16] + dl_folder_path = ( + f"{self.streamrip_config.session.downloads.folder}/{unique}" + ) + dl_path = f"{dl_folder_path}/{track_id}.{parsed_url_ext}" + async with aiohttp.ClientSession() as session: + async with session.get( + track_url, headers={}, timeout=aiohttp.ClientTimeout(total=60) + ) as resp: + resp.raise_for_status() + with open(dl_path, "wb") as f: + async for chunk in resp.content.iter_chunked(1024 * 64): + f.write(chunk) + return dl_path + except Exception as e: + logging.critical("Error: %s", str(e)) + return False