another commit without a list of specific changes! (misc)
This commit is contained in:
168
endpoints/rip.py
168
endpoints/rip.py
@@ -5,17 +5,23 @@ from fastapi.responses import JSONResponse
|
||||
from utils.sr_wrapper import SRUtil
|
||||
from auth.deps import get_current_user
|
||||
from redis import Redis
|
||||
from rq import Queue
|
||||
from rq import Queue, Retry
|
||||
from rq.job import Job
|
||||
from rq.job import JobStatus
|
||||
from rq.registry import (
|
||||
StartedJobRegistry, DeferredJobRegistry,
|
||||
FinishedJobRegistry, FailedJobRegistry,
|
||||
ScheduledJobRegistry)
|
||||
from utils.rip_background import bulk_download
|
||||
from lyric_search.sources import private
|
||||
from typing import Literal
|
||||
from pydantic import BaseModel
|
||||
|
||||
|
||||
class ValidBulkFetchRequest(BaseModel):
|
||||
track_ids: list[int]
|
||||
target: str
|
||||
quality: Literal["FLAC", "Lossy"]
|
||||
|
||||
|
||||
class RIP(FastAPI):
|
||||
@@ -38,7 +44,7 @@ class RIP(FastAPI):
|
||||
"dls",
|
||||
connection=self.redis_conn,
|
||||
default_timeout=14400,
|
||||
default_result_ttl=86400,
|
||||
default_result_ttl=-1,
|
||||
default_failure_ttl=86400,
|
||||
)
|
||||
self.endpoints: dict = {
|
||||
@@ -62,6 +68,30 @@ class RIP(FastAPI):
|
||||
dependencies=dependencies,
|
||||
)
|
||||
|
||||
def _format_job(self, job: Job):
|
||||
"""Helper to normalize job data into JSON."""
|
||||
job_status: str | JobStatus = job.get_status()
|
||||
progress = job.meta.get("progress", 0)
|
||||
if progress == 100 and not job.meta.get("tarball"):
|
||||
job_status = "compressing"
|
||||
|
||||
tracks_in = job.meta.get("tracks_in")
|
||||
tracks_out = len(job.meta.get("tracks", []))
|
||||
|
||||
return {
|
||||
"id": job.id,
|
||||
"status": job_status,
|
||||
"result": job.result,
|
||||
"tarball": job.meta.get("tarball"),
|
||||
"enqueued_at": job.enqueued_at,
|
||||
"started_at": job.started_at,
|
||||
"ended_at": job.ended_at,
|
||||
"progress": progress,
|
||||
"tracks": f"{tracks_out} / {tracks_in}" if isinstance(tracks_in, int) else tracks_out,
|
||||
"target": job.meta.get("target"),
|
||||
"quality": job.meta.get("quality", "Unknown"),
|
||||
}
|
||||
|
||||
async def artists_by_name_handler(
|
||||
self, artist: str, request: Request, user=Depends(get_current_user)
|
||||
) -> Response:
|
||||
@@ -81,10 +111,11 @@ class RIP(FastAPI):
|
||||
return JSONResponse(content=albums)
|
||||
|
||||
async def tracks_by_album_id_handler(
|
||||
self, album_id: int, request: Request, user=Depends(get_current_user)
|
||||
self, album_id: int, request: Request, user=Depends(get_current_user),
|
||||
quality: str = "FLAC"
|
||||
) -> Response:
|
||||
"""Get tracks by album id"""
|
||||
tracks = await self.trip_util.get_tracks_by_album_id(album_id)
|
||||
tracks = await self.trip_util.get_tracks_by_album_id(album_id, quality)
|
||||
if not tracks:
|
||||
return Response(status_code=404, content="Not Found")
|
||||
return JSONResponse(content=tracks)
|
||||
@@ -100,10 +131,10 @@ class RIP(FastAPI):
|
||||
return JSONResponse(content=tracks)
|
||||
|
||||
async def track_by_id_handler(
|
||||
self, track_id: int, request: Request, user=Depends(get_current_user)
|
||||
self, track_id: int, quality: str, request: Request, user=Depends(get_current_user)
|
||||
) -> Response:
|
||||
"""Get track by ID"""
|
||||
track = await self.trip_util.get_stream_url_by_track_id(track_id)
|
||||
track = await self.trip_util.get_stream_url_by_track_id(track_id, quality)
|
||||
if not track:
|
||||
return Response(status_code=404, content="Not found")
|
||||
return JSONResponse(content={"stream_url": track})
|
||||
@@ -126,15 +157,17 @@ class RIP(FastAPI):
|
||||
target = data.target
|
||||
job = self.task_queue.enqueue(
|
||||
bulk_download,
|
||||
args=(track_ids,),
|
||||
args=(track_ids, data.quality,),
|
||||
job_timeout=14400,
|
||||
failure_ttl=86400,
|
||||
result_ttl=86400,
|
||||
result_ttl=-1,
|
||||
retry=Retry(max=1, interval=[30]),
|
||||
meta={
|
||||
'progress': 0,
|
||||
'status': 'queued',
|
||||
'target': target,
|
||||
'tracks_in': len(track_ids),
|
||||
'quality': data.quality,
|
||||
}
|
||||
|
||||
)
|
||||
@@ -143,90 +176,79 @@ class RIP(FastAPI):
|
||||
content={
|
||||
"job_id": job.id,
|
||||
"status": "queued",
|
||||
"target": job.meta.get("target", None)
|
||||
"target": job.meta.get("target", None),
|
||||
"quality": job.meta.get("quality", "Unknown"),
|
||||
}
|
||||
)
|
||||
|
||||
async def job_status_handler(
|
||||
self, job_id: str, request: Request, user=Depends(get_current_user)
|
||||
):
|
||||
async def job_status_handler(self, job_id: str, request: Request, user=Depends(get_current_user)):
|
||||
"""Get status and result of a single job"""
|
||||
|
||||
job = None
|
||||
try:
|
||||
# Try direct fetch first
|
||||
job = Job.fetch(job_id, connection=self.redis_conn)
|
||||
except Exception:
|
||||
# If not found, try registries explicitly (in case fetch failed because the job left the queue)
|
||||
registries = [
|
||||
StartedJobRegistry(queue=self.task_queue),
|
||||
FinishedJobRegistry(queue=self.task_queue),
|
||||
FailedJobRegistry(queue=self.task_queue),
|
||||
DeferredJobRegistry(queue=self.task_queue),
|
||||
ScheduledJobRegistry(queue=self.task_queue),
|
||||
]
|
||||
for registry in registries:
|
||||
if job_id in registry.get_job_ids():
|
||||
try:
|
||||
job = Job.fetch(job_id, connection=self.redis_conn)
|
||||
except Exception:
|
||||
pass
|
||||
break
|
||||
|
||||
if job is None:
|
||||
return JSONResponse({"error": "Job not found"}, status_code=404)
|
||||
job_status: str|JobStatus = job.get_status()
|
||||
job_progress = job.meta.get("progress", 0)
|
||||
job_tarball = job.meta.get("tarball")
|
||||
if job_progress == 100 and not job_tarball:
|
||||
job_status = "compressing"
|
||||
tracks_out = len(job.meta.get("tracks", []))
|
||||
tracks_in = job.meta.get("tracks_in", None)
|
||||
|
||||
return {
|
||||
"id": job.id,
|
||||
"status": job_status,
|
||||
"result": job.result,
|
||||
"enqueued_at": job.enqueued_at,
|
||||
"started_at": job.started_at,
|
||||
"ended_at": job.ended_at,
|
||||
"progress": job_progress,
|
||||
"target": job.meta.get("target"),
|
||||
"tracks": f"{tracks_out} / {tracks_in}" if isinstance(tracks_in, int) else tracks_out,
|
||||
"tarball": job_tarball,
|
||||
}
|
||||
|
||||
return self._format_job(job)
|
||||
|
||||
async def job_list_handler(self, request: Request, user=Depends(get_current_user)):
|
||||
"""List all jobs in the queue (queued + finished, if result_ttl allows)"""
|
||||
"""List all jobs across all registries (queued, started, finished, failed, etc)."""
|
||||
jobs_info = []
|
||||
seen = set()
|
||||
|
||||
# Jobs still in the queue (pending)
|
||||
# 1. Jobs still waiting in queue
|
||||
for job in self.task_queue.jobs:
|
||||
status: str|JobStatus = job.get_status()
|
||||
job_progress = job.meta.get("progress", 0)
|
||||
tarball = job.meta.get("tarball")
|
||||
if job_progress == 100 and not tarball:
|
||||
status = "compressing"
|
||||
jobs_info.append(
|
||||
{
|
||||
"id": job.id,
|
||||
"status": status, # queued
|
||||
"result": job.result,
|
||||
"enqueued_at": job.enqueued_at,
|
||||
"progress": job.meta.get("progress", 0),
|
||||
"target": job.meta.get("target", None)
|
||||
}
|
||||
)
|
||||
jobs_info.append(self._format_job(job))
|
||||
seen.add(job.id)
|
||||
|
||||
# Running jobs tracked via enqueued_job_ids
|
||||
# 2. Jobs in Started/Finished/Failed/Deferred registries
|
||||
registries = [
|
||||
StartedJobRegistry(queue=self.task_queue),
|
||||
FinishedJobRegistry(queue=self.task_queue),
|
||||
FailedJobRegistry(queue=self.task_queue),
|
||||
DeferredJobRegistry(queue=self.task_queue),
|
||||
]
|
||||
for registry in registries:
|
||||
for jid in registry.get_job_ids():
|
||||
if jid in seen:
|
||||
continue
|
||||
try:
|
||||
job = Job.fetch(jid, connection=self.redis_conn)
|
||||
jobs_info.append(self._format_job(job))
|
||||
seen.add(job.id)
|
||||
except Exception:
|
||||
continue # job might have been cleaned up
|
||||
|
||||
# 3. Jobs tracked in your custom enqueued_job_ids list
|
||||
job_ids = self.redis_conn.lrange("enqueued_job_ids", 0, -1)
|
||||
for jid_bytes in job_ids: # type: ignore
|
||||
jid = jid_bytes.decode()
|
||||
if jid in seen:
|
||||
continue
|
||||
try:
|
||||
job = Job.fetch(jid, connection=self.redis_conn)
|
||||
jobs_info.append(self._format_job(job))
|
||||
seen.add(job.id)
|
||||
except Exception:
|
||||
continue # job may have completed and expired
|
||||
|
||||
job_status: str|JobStatus = job.get_status()
|
||||
job_progress = job.meta.get("progress", 0)
|
||||
if job_progress == 100 and not job.meta.get("tarball"):
|
||||
job_status = "compressing"
|
||||
if job_status in ("started", "queued", "finished", "failed", "compressing"):
|
||||
# avoid duplicates for jobs already in task_queue.jobs
|
||||
if not any(j["id"] == job.id for j in jobs_info):
|
||||
tracks_in = job.meta.get("tracks_in", None)
|
||||
tracks_out = len(job.meta.get("tracks", []))
|
||||
jobs_info.append(
|
||||
{
|
||||
"id": job.id,
|
||||
"status": job_status,
|
||||
"result": job.result,
|
||||
"tarball": job.meta.get("tarball", None),
|
||||
"enqueued_at": job.enqueued_at,
|
||||
"progress": job.meta.get("progress", 0),
|
||||
"tracks": f"{tracks_out} / {tracks_in}" if isinstance(tracks_in, int) else tracks_out,
|
||||
"target": job.meta.get("target", None),
|
||||
}
|
||||
)
|
||||
continue
|
||||
|
||||
return {"jobs": jobs_info}
|
||||
|
Reference in New Issue
Block a user