TRip: capitalize RQ job statuses in related endpoints, order job list, other: minor/typing

This commit is contained in:
2025-08-23 08:20:32 -04:00
parent a8d089c0fe
commit a11748775e
4 changed files with 59 additions and 19 deletions

View File

@@ -3,11 +3,18 @@ import time
import os import os
import json import json
import random import random
from typing import Optional, Annotated from typing import Any, Optional, Annotated
from fastapi import FastAPI, Request, UploadFile, Response, HTTPException, Form, Depends from fastapi import FastAPI, Request, UploadFile, Response, HTTPException, Form, Depends
from fastapi_throttle import RateLimiter from fastapi_throttle import RateLimiter
from fastapi.responses import JSONResponse from fastapi.responses import JSONResponse
import redis.asyncio as redis import redis
from rq import Queue
from rq.registry import (
StartedJobRegistry,
FinishedJobRegistry,
FailedJobRegistry,
DeferredJobRegistry
)
from lyric_search.sources import private, cache as LyricsCache, redis_cache from lyric_search.sources import private, cache as LyricsCache, redis_cache
@@ -22,7 +29,7 @@ class Misc(FastAPI):
self.constants = constants self.constants = constants
self.lyr_cache = LyricsCache.Cache() self.lyr_cache = LyricsCache.Cache()
self.redis_cache = redis_cache.RedisCache() self.redis_cache = redis_cache.RedisCache()
self.redis_client = redis.Redis(password=private.REDIS_PW) self.redis_client: Any = redis.Redis(password=private.REDIS_PW)
self.radio = radio self.radio = radio
self.activity_image: Optional[bytes] = None self.activity_image: Optional[bytes] = None
self.nos_json_path: str = os.path.join( self.nos_json_path: str = os.path.join(
@@ -35,6 +42,7 @@ class Misc(FastAPI):
"widget/sqlite": self.homepage_sqlite_widget, "widget/sqlite": self.homepage_sqlite_widget,
"widget/lyrics": self.homepage_lyrics_widget, "widget/lyrics": self.homepage_lyrics_widget,
"widget/radio": self.homepage_radio_widget, "widget/radio": self.homepage_radio_widget,
"widget/rq": self.homepage_rq_widget,
"misc/get_activity_image": self.get_activity_image, "misc/get_activity_image": self.get_activity_image,
"misc/no": self.no, "misc/no": self.no,
} }
@@ -141,14 +149,14 @@ class Misc(FastAPI):
""" """
# Measure response time w/ test lyric search # Measure response time w/ test lyric search
time_start: float = time.time() # Start time for response_time time_start: float = time.time() # Start time for response_time
test_lyrics_result = await self.redis_client.ft().search( # noqa: F841 test_lyrics_result = self.redis_client.ft().search( # noqa: F841
"@artist: test @song: test" "@artist: test @song: test"
) )
time_end: float = time.time() time_end: float = time.time()
# End response time test # End response time test
total_keys = await self.redis_client.dbsize() total_keys = self.redis_client.dbsize()
response_time: float = time_end - time_start response_time: float = time_end - time_start
index_info = await self.redis_client.ft().info() index_info = self.redis_client.ft().info()
indexed_lyrics: int = index_info.get("num_docs") indexed_lyrics: int = index_info.get("num_docs")
return JSONResponse( return JSONResponse(
content={ content={
@@ -159,6 +167,30 @@ class Misc(FastAPI):
} }
) )
async def homepage_rq_widget(self) -> JSONResponse:
"""
Homepage RQ Widget Handler
"""
queue_name = "dls"
queue = Queue(queue_name, self.redis_client)
queued = queue.count
started = StartedJobRegistry(queue_name, connection=self.redis_client).count
failed = FailedJobRegistry(queue_name, connection=self.redis_client).count
finished = FinishedJobRegistry(queue_name, connection=self.redis_client).count
deferred = DeferredJobRegistry(queue_name, connection=self.redis_client).count
return JSONResponse(
content={
queue_name: {
"queued": queued,
"started": started,
"failed": failed,
"finished": finished,
"deferred": deferred,
}
}
)
async def homepage_sqlite_widget(self) -> JSONResponse: async def homepage_sqlite_widget(self) -> JSONResponse:
""" """
Homepage SQLite Widget Handler Homepage SQLite Widget Handler

View File

@@ -76,14 +76,14 @@ class RIP(FastAPI):
job_status: str | JobStatus = job.get_status() job_status: str | JobStatus = job.get_status()
progress = job.meta.get("progress", 0) progress = job.meta.get("progress", 0)
if progress == 100 and not job.meta.get("tarball"): if progress == 100 and not job.meta.get("tarball"):
job_status = "compressing" job_status = "Compressing"
tracks_in = job.meta.get("tracks_in") tracks_in = job.meta.get("tracks_in")
tracks_out = len(job.meta.get("tracks", [])) tracks_out = len(job.meta.get("tracks", []))
return { return {
"id": job.id, "id": job.id,
"status": job_status, "status": job_status.title(),
"result": job.result, "result": job.result,
"tarball": job.meta.get("tarball"), "tarball": job.meta.get("tarball"),
"enqueued_at": job.enqueued_at, "enqueued_at": job.enqueued_at,
@@ -179,7 +179,7 @@ class RIP(FastAPI):
retry=Retry(max=1, interval=[30]), retry=Retry(max=1, interval=[30]),
meta={ meta={
"progress": 0, "progress": 0,
"status": "queued", "status": "Queued",
"target": target, "target": target,
"tracks_in": len(track_ids), "tracks_in": len(track_ids),
"quality": data.quality, "quality": data.quality,
@@ -189,7 +189,7 @@ class RIP(FastAPI):
return JSONResponse( return JSONResponse(
content={ content={
"job_id": job.id, "job_id": job.id,
"status": "queued", "status": "Queued",
"target": job.meta.get("target", None), "target": job.meta.get("target", None),
"quality": job.meta.get("quality", "Unknown"), "quality": job.meta.get("quality", "Unknown"),
} }
@@ -267,4 +267,10 @@ class RIP(FastAPI):
except Exception: except Exception:
continue continue
# ---- Sort newest first ----
def job_sort_key(job):
return job.get("ended_at") or job.get("started_at") or job.get("enqueued_at") or 0
jobs_info.sort(key=job_sort_key, reverse=True)
return {"jobs": jobs_info} return {"jobs": jobs_info}

View File

@@ -66,6 +66,8 @@ class Cache:
confidence=row["confidence"], confidence=row["confidence"],
) )
else: else:
if not sqlite_rows:
return None
for row in sqlite_rows: for row in sqlite_rows:
if row[0] == matched_id: if row[0] == matched_id:
(_id, artist, song, lyrics, original_src) = row[:-1] (_id, artist, song, lyrics, original_src) = row[:-1]

View File

@@ -101,7 +101,7 @@ def bulk_download(track_list: list, quality: str = "FLAC"):
job.meta["tracks"] = [] # will hold per-track dicts job.meta["tracks"] = [] # will hold per-track dicts
job.meta["progress"] = 0 job.meta["progress"] = 0
job.meta["tarball"] = None job.meta["tarball"] = None
job.meta["status"] = "started" job.meta["status"] = "Started"
job.save_meta() job.save_meta()
except Exception as e: except Exception as e:
logging.warning("Failed to init job.meta: %s", e) logging.warning("Failed to init job.meta: %s", e)
@@ -123,7 +123,7 @@ def bulk_download(track_list: list, quality: str = "FLAC"):
for i, track_id in enumerate(track_list or []): for i, track_id in enumerate(track_list or []):
track_info = { track_info = {
"track_id": str(track_id), "track_id": str(track_id),
"status": "pending", # pending | success | failed "status": "Pending", # Pending | Success | Failed
"file_path": None, # str | None "file_path": None, # str | None
"error": None, # str | None "error": None, # str | None
"attempts": 0, # int "attempts": 0, # int
@@ -177,7 +177,7 @@ def bulk_download(track_list: list, quality: str = "FLAC"):
tmp_file = None # consumed tmp_file = None # consumed
# Track success # Track success
track_info["status"] = "success" track_info["status"] = "Success"
track_info["file_path"] = str(final_file) track_info["file_path"] = str(final_file)
track_info["error"] = None track_info["error"] = None
all_final_files.append(final_file) all_final_files.append(final_file)
@@ -193,7 +193,7 @@ def bulk_download(track_list: list, quality: str = "FLAC"):
) )
track_info["error"] = str(e) track_info["error"] = str(e)
if attempt >= MAX_RETRIES: if attempt >= MAX_RETRIES:
track_info["status"] = "failed" track_info["status"] = "Failed"
# small backoff before next attempt (or next track) # small backoff before next attempt (or next track)
await asyncio.sleep(random.uniform(THROTTLE_MIN, THROTTLE_MAX)) await asyncio.sleep(random.uniform(THROTTLE_MIN, THROTTLE_MAX))
finally: finally:
@@ -223,7 +223,7 @@ def bulk_download(track_list: list, quality: str = "FLAC"):
if job: if job:
try: try:
job.meta["tarball"] = None job.meta["tarball"] = None
job.meta["status"] = "failed" job.meta["status"] = "Failed"
job.save_meta() job.save_meta()
except Exception: except Exception:
pass pass
@@ -232,7 +232,7 @@ def bulk_download(track_list: list, quality: str = "FLAC"):
# Pick artist with the most tracks # Pick artist with the most tracks
artist_counts: dict[str, int] = {} artist_counts: dict[str, int] = {}
for t in per_track_meta: for t in per_track_meta:
if t["status"] == "success" and t.get("file_path"): if t["status"] == "Success" and t.get("file_path"):
try: try:
artist = Path(t["file_path"]).relative_to(ROOT_DIR).parts[0] artist = Path(t["file_path"]).relative_to(ROOT_DIR).parts[0]
except Exception: except Exception:
@@ -256,7 +256,7 @@ def bulk_download(track_list: list, quality: str = "FLAC"):
if job: if job:
try: try:
job.meta["status"] = "compressing" job.meta["status"] = "Compressing"
job.save_meta() job.save_meta()
except Exception: except Exception:
pass pass
@@ -303,7 +303,7 @@ def bulk_download(track_list: list, quality: str = "FLAC"):
if job: if job:
job.meta["tarball"] = str(final_tarball) job.meta["tarball"] = str(final_tarball)
job.meta["progress"] = 100 job.meta["progress"] = 100
job.meta["status"] = "completed" job.meta["status"] = "Completed"
job.save_meta() job.save_meta()
return [str(final_tarball)] return [str(final_tarball)]
@@ -315,7 +315,7 @@ def bulk_download(track_list: list, quality: str = "FLAC"):
return loop.run_until_complete(process_tracks()) return loop.run_until_complete(process_tracks())
except Exception as e: except Exception as e:
if job: if job:
job.meta["status"] = "failed" job.meta["status"] = "Failed"
job.save_meta() job.save_meta()
logging.critical("Exception: %s", str(e)) logging.critical("Exception: %s", str(e))
finally: finally: