import logging from fastapi import FastAPI, Request, Response, Depends from fastapi_throttle import RateLimiter from fastapi.responses import JSONResponse from utils.sr_wrapper import SRUtil from auth.deps import get_current_user from redis import Redis from rq import Queue from rq.job import Job from utils.rip_background import bulk_download from lyric_search.sources import private from pydantic import BaseModel class ValidBulkFetchRequest(BaseModel): track_ids: list[int] class RIP(FastAPI): """ Ripping Endpoints """ def __init__(self, app: FastAPI, my_util, constants) -> None: self.app: FastAPI = app self.util = my_util self.trip_util = SRUtil() self.constants = constants self.redis_conn = Redis( host="localhost", port=6379, db=0, password=private.REDIS_PW, ) self.task_queue = Queue( "dls", connection=self.redis_conn, default_timeout=3600, default_result_ttl=86400, default_failure_ttl=86400, ) self.endpoints: dict = { "trip/get_artists_by_name": self.artists_by_name_handler, "trip/get_albums_by_artist_id/{artist_id:path}": self.albums_by_artist_id_handler, "trip/get_tracks_by_artist_song": self.tracks_by_artist_song_handler, "trip/get_tracks_by_album_id/{album_id:path}": self.tracks_by_album_id_handler, "trip/get_track_by_id/{track_id:path}": self.track_by_id_handler, "trip/bulk_fetch": self.bulk_fetch_handler, "trip/job/{job_id:path}": self.job_status_handler, "trip/jobs/list": self.job_list_handler, } for endpoint, handler in self.endpoints.items(): dependencies = [Depends(RateLimiter(times=8, seconds=2))] app.add_api_route( f"/{endpoint}", handler, methods=["GET"] if endpoint != "trip/bulk_fetch" else ["POST"], include_in_schema=True, dependencies=dependencies, ) async def artists_by_name_handler( self, artist: str, request: Request, user=Depends(get_current_user) ) -> Response: """Get artists by name""" artists = await self.trip_util.get_artists_by_name(artist) if not artists: return Response(status_code=404, content="Not found") return JSONResponse(content=artists) async def albums_by_artist_id_handler( self, artist_id: int, request: Request, user=Depends(get_current_user) ) -> Response: """Get albums by artist ID""" albums = await self.trip_util.get_albums_by_artist_id(artist_id) if not albums: return Response(status_code=404, content="Not found") return JSONResponse(content=albums) async def tracks_by_album_id_handler( self, album_id: int, request: Request, user=Depends(get_current_user) ) -> Response: """Get tracks by album id""" tracks = await self.trip_util.get_tracks_by_album_id(album_id) if not tracks: return Response(status_code=404, content="Not Found") return JSONResponse(content=tracks) async def tracks_by_artist_song_handler( self, artist: str, song: str, request: Request, user=Depends(get_current_user) ) -> Response: """Get tracks by artist and song name""" logging.critical("Searching for tracks by artist: %s, song: %s", artist, song) tracks = await self.trip_util.get_tracks_by_artist_song(artist, song) if not tracks: return Response(status_code=404, content="Not found") return JSONResponse(content=tracks) async def track_by_id_handler( self, track_id: int, request: Request, user=Depends(get_current_user) ) -> Response: """Get track by ID""" track = await self.trip_util.get_stream_url_by_track_id(track_id) if not track: return Response(status_code=404, content="Not found") return JSONResponse(content={"stream_url": track}) async def bulk_fetch_handler( self, data: ValidBulkFetchRequest, request: Request, user=Depends(get_current_user), ) -> Response: """Bulk fetch a list of track IDs""" if not data or not data.track_ids: return JSONResponse( content={ "err": True, "errorText": "Invalid data", } ) track_ids = data.track_ids job = self.task_queue.enqueue( bulk_download, args=(track_ids,), job_timeout=3600, failure_ttl=86400, result_ttl=86400, ) self.redis_conn.lpush("enqueued_job_ids", job.id) return JSONResponse( content={ "job_id": job.id, "status": "queued", } ) async def job_status_handler( self, job_id: str, request: Request, user=Depends(get_current_user) ): """Get status and result of a single job""" try: job = Job.fetch(job_id, connection=self.redis_conn) except Exception: return JSONResponse({"error": "Job not found"}, status_code=404) return { "id": job.id, "status": job.get_status(), "result": job.result, "enqueued_at": job.enqueued_at, "started_at": job.started_at, "ended_at": job.ended_at, } async def job_list_handler(self, request: Request, user=Depends(get_current_user)): """List all jobs in the queue (queued + finished, if result_ttl allows)""" jobs_info = [] # Jobs still in the queue (pending) for job in self.task_queue.jobs: jobs_info.append( { "id": job.id, "status": job.get_status(), # queued "result": job.result, "enqueued_at": job.enqueued_at, "progress": job.meta.get("progress", None), } ) # Started/running jobs tracked via enqueued_job_ids job_ids = self.redis_conn.lrange("enqueued_job_ids", 0, -1) for jid_bytes in job_ids: # type: ignore jid = jid_bytes.decode() try: job = Job.fetch(jid, connection=self.redis_conn) except Exception: continue # job may have completed and expired status = job.get_status() if status in ("started", "queued", "finished"): # avoid duplicates for jobs already in task_queue.jobs if not any(j["id"] == job.id for j in jobs_info): jobs_info.append( { "id": job.id, "status": status, "result": job.result, "enqueued_at": job.enqueued_at, "progress": job.meta.get("progress", None), "tracks": job.meta.get("track_list", None), } ) return {"jobs": jobs_info}