import logging from fastapi import FastAPI, Request, Response, Depends from fastapi_throttle import RateLimiter from fastapi.responses import JSONResponse from utils.sr_wrapper import SRUtil from auth.deps import get_current_user from redis import Redis from rq import Queue, Retry from rq.job import Job from rq.job import JobStatus from rq.registry import ( StartedJobRegistry, DeferredJobRegistry, FinishedJobRegistry, FailedJobRegistry, ScheduledJobRegistry, ) from utils.rip_background import bulk_download from lyric_search.sources import private from typing import Literal from pydantic import BaseModel class ValidBulkFetchRequest(BaseModel): track_ids: list[int] target: str quality: Literal["FLAC", "Lossy"] = "FLAC" class RIP(FastAPI): """ Ripping Endpoints """ def __init__(self, app: FastAPI, my_util, constants) -> None: self.app: FastAPI = app self.util = my_util self.trip_util = SRUtil() self.constants = constants self.redis_conn = Redis( host="localhost", port=6379, db=0, password=private.REDIS_PW, ) self.task_queue = Queue( "dls", connection=self.redis_conn, default_timeout=14400, default_result_ttl=-1, default_failure_ttl=86400, ) self.endpoints: dict = { "trip/get_artists_by_name": self.artists_by_name_handler, "trip/get_albums_by_artist_id/{artist_id:path}": self.albums_by_artist_id_handler, "trip/get_tracks_by_artist_song": self.tracks_by_artist_song_handler, "trip/get_tracks_by_album_id/{album_id:path}": self.tracks_by_album_id_handler, "trip/get_track_by_id/{track_id:path}": self.track_by_id_handler, "trip/bulk_fetch": self.bulk_fetch_handler, "trip/job/{job_id:path}": self.job_status_handler, "trip/jobs/list": self.job_list_handler, } for endpoint, handler in self.endpoints.items(): dependencies = [Depends(RateLimiter(times=8, seconds=2))] app.add_api_route( f"/{endpoint}", handler, methods=["GET"] if endpoint != "trip/bulk_fetch" else ["POST"], include_in_schema=True, dependencies=dependencies, ) def _format_job(self, job: Job): """Helper to normalize job data into JSON.""" job_status: str | JobStatus = job.get_status() progress = job.meta.get("progress", 0) if progress == 100 and not job.meta.get("tarball"): job_status = "compressing" tracks_in = job.meta.get("tracks_in") tracks_out = len(job.meta.get("tracks", [])) return { "id": job.id, "status": job_status, "result": job.result, "tarball": job.meta.get("tarball"), "enqueued_at": job.enqueued_at, "started_at": job.started_at, "ended_at": job.ended_at, "progress": progress, "tracks": f"{tracks_out} / {tracks_in}" if isinstance(tracks_in, int) else tracks_out, "target": job.meta.get("target"), "quality": job.meta.get("quality", "Unknown"), } async def artists_by_name_handler( self, artist: str, request: Request, user=Depends(get_current_user) ) -> Response: """Get artists by name""" artists = await self.trip_util.get_artists_by_name(artist) if not artists: return Response(status_code=404, content="Not found") return JSONResponse(content=artists) async def albums_by_artist_id_handler( self, artist_id: int, request: Request, user=Depends(get_current_user) ) -> Response: """Get albums by artist ID""" albums = await self.trip_util.get_albums_by_artist_id(artist_id) if not albums: return Response(status_code=404, content="Not found") return JSONResponse(content=albums) async def tracks_by_album_id_handler( self, album_id: int, request: Request, user=Depends(get_current_user), quality: Literal["FLAC", "Lossy"] = "FLAC" ) -> Response: """Get tracks by album id""" tracks = await self.trip_util.get_tracks_by_album_id(album_id, quality) if not tracks: return Response(status_code=404, content="Not Found") return JSONResponse(content=tracks) async def tracks_by_artist_song_handler( self, artist: str, song: str, request: Request, user=Depends(get_current_user) ) -> Response: """Get tracks by artist and song name""" logging.critical("Searching for tracks by artist: %s, song: %s", artist, song) tracks = await self.trip_util.get_tracks_by_artist_song(artist, song) if not tracks: return Response(status_code=404, content="Not found") return JSONResponse(content=tracks) async def track_by_id_handler( self, track_id: int, request: Request, quality: Literal["FLAC", "Lossy"] = "FLAC", user=Depends(get_current_user), ) -> Response: """Get track by ID""" track = await self.trip_util.get_stream_url_by_track_id(track_id, quality) if not track: return Response(status_code=404, content="Not found") return JSONResponse(content={"stream_url": track}) async def bulk_fetch_handler( self, data: ValidBulkFetchRequest, request: Request, user=Depends(get_current_user), ) -> Response: """Bulk fetch a list of track IDs""" if not data or not data.track_ids or not data.target: return JSONResponse( content={ "err": True, "errorText": "Invalid data", } ) track_ids = data.track_ids target = data.target job = self.task_queue.enqueue( bulk_download, args=( track_ids, data.quality, ), job_timeout=14400, failure_ttl=86400, result_ttl=-1, retry=Retry(max=1, interval=[30]), meta={ "progress": 0, "status": "queued", "target": target, "tracks_in": len(track_ids), "quality": data.quality, }, ) self.redis_conn.lpush("enqueued_job_ids", job.id) return JSONResponse( content={ "job_id": job.id, "status": "queued", "target": job.meta.get("target", None), "quality": job.meta.get("quality", "Unknown"), } ) async def job_status_handler( self, job_id: str, request: Request, user=Depends(get_current_user) ): """Get status and result of a single job""" job = None try: # Try direct fetch first job = Job.fetch(job_id, connection=self.redis_conn) except Exception: # If not found, try registries explicitly (in case fetch failed because the job left the queue) registries = [ StartedJobRegistry(queue=self.task_queue), FinishedJobRegistry(queue=self.task_queue), FailedJobRegistry(queue=self.task_queue), DeferredJobRegistry(queue=self.task_queue), ScheduledJobRegistry(queue=self.task_queue), ] for registry in registries: if job_id in registry.get_job_ids(): try: job = Job.fetch(job_id, connection=self.redis_conn) except Exception: pass break if job is None: return JSONResponse({"error": "Job not found"}, status_code=404) return self._format_job(job) async def job_list_handler(self, request: Request, user=Depends(get_current_user)): """List all jobs across all registries (queued, started, finished, failed, etc).""" jobs_info = [] seen = set() # 1. Jobs still waiting in queue for job in self.task_queue.jobs: jobs_info.append(self._format_job(job)) seen.add(job.id) # 2. Jobs in Started/Finished/Failed/Deferred registries registries = [ StartedJobRegistry(queue=self.task_queue), FinishedJobRegistry(queue=self.task_queue), FailedJobRegistry(queue=self.task_queue), DeferredJobRegistry(queue=self.task_queue), ] for registry in registries: for jid in registry.get_job_ids(): if jid in seen: continue try: job = Job.fetch(jid, connection=self.redis_conn) jobs_info.append(self._format_job(job)) seen.add(job.id) except Exception: continue # job might have been cleaned up # 3. Jobs tracked in your custom enqueued_job_ids list job_ids = self.redis_conn.lrange("enqueued_job_ids", 0, -1) for jid_bytes in job_ids: # type: ignore jid = jid_bytes.decode() if jid in seen: continue try: job = Job.fetch(jid, connection=self.redis_conn) jobs_info.append(self._format_job(job)) seen.add(job.id) except Exception: continue return {"jobs": jobs_info}