Compare commits

...

15 Commits

Author SHA1 Message Date
3b74333b96 misc 2025-09-12 22:39:59 -04:00
f6d4ed57f3 misc 2025-09-09 15:50:13 -04:00
a57173b90a TRip: change file naming, use pigz for faster tarball creation 2025-08-29 10:23:06 -04:00
a11748775e TRip: capitalize RQ job statuses in related endpoints, order job list, other: minor/typing 2025-08-23 08:20:32 -04:00
a8d089c0fe minor 2025-08-21 15:35:10 -04:00
dd8d07b2f0 formatting 2025-08-21 15:08:13 -04:00
22eaa2260e another commit without a list of specific changes! (misc) 2025-08-21 15:06:56 -04:00
e0f64f6773 misc 2025-08-20 15:58:07 -04:00
81f79dea1e misc 2025-08-20 07:32:57 -04:00
3cebe14674 misc / TRip: folder structure / tar naming 2025-08-15 14:58:06 -04:00
27fa1f78ed misc 2025-08-15 14:15:13 -04:00
0cd4a71db2 formatting / RQ tuning 2025-08-15 13:39:27 -04:00
93050ec6cf misc / RQ bulk downloads for TRip 2025-08-15 13:31:15 -04:00
72a7734152 minor 2025-08-11 15:06:58 -04:00
4cbd0fb934 docstrings 2025-08-11 14:06:42 -04:00
16 changed files with 1003 additions and 101 deletions

3
.gitignore vendored
View File

@@ -25,5 +25,8 @@ endpoints/auth.py
endpoints/radio2
endpoints/radio2/**
hash_password.py
up.py
job_review.py
check_missing.py
**/auth/*
.gitignore

View File

@@ -9,9 +9,10 @@ from fastapi import FastAPI, Request
from fastapi.middleware.cors import CORSMiddleware
from lyric_search.sources import redis_cache
logging.basicConfig(level=logging.INFO)
logging.getLogger("aiosqlite").setLevel(logging.WARNING)
logging.getLogger("httpx").setLevel(logging.WARNING)
logger = logging.getLogger()
logger.setLevel(logging.INFO)
loop = asyncio.get_event_loop()
app = FastAPI(
@@ -22,7 +23,6 @@ app = FastAPI(
loop=loop,
)
constants = importlib.import_module("constants").Constants()
util = importlib.import_module("util").Utilities(app, constants)

View File

@@ -1,6 +1,9 @@
from typing import Optional
from typing import Literal
from pydantic import BaseModel
Station = Literal["main", "rock", "rap", "electronic", "pop"]
"""
LastFM
"""
@@ -96,25 +99,6 @@ class ValidYTSearchRequest(BaseModel):
t: str = "rick astley - never gonna give you up"
"""
XC
"""
class ValidXCRequest(BaseModel):
"""
- **key**: valid XC API key
- **bid**: bot id
- **cmd**: bot command
- **data**: command data
"""
key: str
bid: int
cmd: str
data: Optional[dict]
"""
Transcriptions
"""
@@ -211,7 +195,7 @@ class ValidRadioSongRequest(BaseModel):
song: Optional[str] = None
artistsong: Optional[str] = None
alsoSkip: Optional[bool] = False
station: str = "main"
station: Station = "main"
class ValidRadioTypeaheadRequest(BaseModel):
@@ -241,7 +225,7 @@ class ValidRadioNextRequest(BaseModel):
key: str
skipTo: Optional[str] = None
station: str = "main"
station: Station = "main"
class ValidRadioReshuffleRequest(ValidRadioNextRequest):
@@ -262,7 +246,7 @@ class ValidRadioQueueRequest(BaseModel):
draw: Optional[int] = 1
start: Optional[int] = 0
search: Optional[str] = None
station: str = "main"
station: Station = "main"
class ValidRadioQueueShiftRequest(BaseModel):
@@ -276,7 +260,7 @@ class ValidRadioQueueShiftRequest(BaseModel):
key: str
uuid: str
next: Optional[bool] = False
station: str = "main"
station: Station = "main"
class ValidRadioQueueRemovalRequest(BaseModel):
@@ -288,4 +272,4 @@ class ValidRadioQueueRemovalRequest(BaseModel):
key: str
uuid: str
station: str = "main"
station: Station = "main"

View File

@@ -30,7 +30,7 @@ class Meme(FastAPI):
f"/{endpoint}",
handler,
methods=["GET"],
include_in_schema=True,
include_in_schema=False,
dependencies=dependencies,
)

View File

@@ -3,11 +3,18 @@ import time
import os
import json
import random
from typing import Optional, Annotated
from typing import Any, Optional, Annotated
from fastapi import FastAPI, Request, UploadFile, Response, HTTPException, Form, Depends
from fastapi_throttle import RateLimiter
from fastapi.responses import JSONResponse
import redis.asyncio as redis
import redis
from rq import Queue
from rq.registry import (
StartedJobRegistry,
FinishedJobRegistry,
FailedJobRegistry,
DeferredJobRegistry,
)
from lyric_search.sources import private, cache as LyricsCache, redis_cache
@@ -22,7 +29,7 @@ class Misc(FastAPI):
self.constants = constants
self.lyr_cache = LyricsCache.Cache()
self.redis_cache = redis_cache.RedisCache()
self.redis_client = redis.Redis(password=private.REDIS_PW)
self.redis_client: Any = redis.Redis(password=private.REDIS_PW)
self.radio = radio
self.activity_image: Optional[bytes] = None
self.nos_json_path: str = os.path.join(
@@ -35,6 +42,7 @@ class Misc(FastAPI):
"widget/sqlite": self.homepage_sqlite_widget,
"widget/lyrics": self.homepage_lyrics_widget,
"widget/radio": self.homepage_radio_widget,
"widget/rq": self.homepage_rq_widget,
"misc/get_activity_image": self.get_activity_image,
"misc/no": self.no,
}
@@ -141,14 +149,14 @@ class Misc(FastAPI):
"""
# Measure response time w/ test lyric search
time_start: float = time.time() # Start time for response_time
test_lyrics_result = await self.redis_client.ft().search( # noqa: F841
test_lyrics_result = self.redis_client.ft().search( # noqa: F841
"@artist: test @song: test"
)
time_end: float = time.time()
# End response time test
total_keys = await self.redis_client.dbsize()
total_keys = self.redis_client.dbsize()
response_time: float = time_end - time_start
index_info = await self.redis_client.ft().info()
index_info = self.redis_client.ft().info()
indexed_lyrics: int = index_info.get("num_docs")
return JSONResponse(
content={
@@ -159,6 +167,30 @@ class Misc(FastAPI):
}
)
async def homepage_rq_widget(self) -> JSONResponse:
"""
Homepage RQ Widget Handler
"""
queue_name = "dls"
queue = Queue(queue_name, self.redis_client)
queued = queue.count
started = StartedJobRegistry(queue_name, connection=self.redis_client).count
failed = FailedJobRegistry(queue_name, connection=self.redis_client).count
finished = FinishedJobRegistry(queue_name, connection=self.redis_client).count
deferred = DeferredJobRegistry(queue_name, connection=self.redis_client).count
return JSONResponse(
content={
queue_name: {
"queued": queued,
"started": started,
"failed": failed,
"finished": finished,
"deferred": deferred,
}
}
)
async def homepage_sqlite_widget(self) -> JSONResponse:
"""
Homepage SQLite Widget Handler

View File

@@ -10,6 +10,7 @@ from .constructors import (
ValidRadioSongRequest,
ValidRadioTypeaheadRequest,
ValidRadioQueueRequest,
Station
)
from utils import radio_util
from typing import Optional
@@ -43,23 +44,19 @@ class Radio(FastAPI):
"radio/reshuffle": self.radio_reshuffle,
"radio/queue_remove": self.radio_queue_remove,
"radio/ls._next_": self.radio_get_next,
"radio/album_art": self.album_art_handler,
}
for endpoint, handler in self.endpoints.items():
methods: list[str] = ["POST"]
if endpoint == "radio/album_art":
methods = ["GET"]
app.add_api_route(
f"/{endpoint}", handler, methods=["POST"], include_in_schema=True,
f"/{endpoint}", handler, methods=methods, include_in_schema=False,
dependencies=[Depends(
RateLimiter(times=10, seconds=2))] if not endpoint == "radio/np" else None,
)
# NOTE: Not in loop because method is GET for this endpoint
app.add_api_route(
"/radio/album_art",
self.album_art_handler,
methods=["GET"],
include_in_schema=True,
)
app.add_event_handler("startup", self.on_start)
async def on_start(self) -> None:
@@ -135,21 +132,35 @@ class Radio(FastAPI):
"""
Get current play queue (paged, 20 results per page)
"""
if not (data and data.station):
return JSONResponse(status_code=500,
content={
"err": True,
"errorText": "Invalid request.",
})
search: Optional[str] = None
draw: int = 0
if isinstance(data, ValidRadioQueueRequest):
search = data.search
draw = data.draw
start: int = int(data.start)
draw = data.draw or 0
start: int = int(data.start or 0)
end: int = start + 20
else:
start: int = 0
end: int = 20
orig_queue: list[dict] = self.radio_util.active_playlist[data.station]
if not search:
queue_full: list = orig_queue
queue_full: Optional[list] = orig_queue
else:
queue_full: list = self.radio_util.datatables_search(data.search, data.station)
queue_full = self.radio_util.datatables_search(search, data.station)
if not queue_full:
return JSONResponse(
status_code=500,
content={
"err": True,
"errorText": "No queue found.",
}
)
queue: list = queue_full[start:end]
queue_out: list[dict] = []
for x, item in enumerate(queue):
@@ -240,7 +251,7 @@ class Radio(FastAPI):
async def album_art_handler(
self, request: Request, track_id: Optional[int] = None,
station: Optional[str] = "main"
station: Station = "main"
) -> Response:
"""
Get album art, optional parameter track_id may be specified.
@@ -251,6 +262,13 @@ class Radio(FastAPI):
try:
if not track_id:
track_id = self.radio_util.now_playing[station].get("id")
if not track_id:
# Still no track ID
return JSONResponse(status_code=500,
content={
"err": True,
"errorText": "Invalid request",
})
logging.debug("Seeking album art with trackId: %s", track_id)
album_art: Optional[bytes] = self.radio_util.get_album_art(
track_id=track_id
@@ -269,7 +287,7 @@ class Radio(FastAPI):
)
async def radio_now_playing(self, request: Request,
station: Optional[str] = "main") -> JSONResponse:
station: Station = "main") -> JSONResponse:
"""
Get currently playing track info
- **station**: default "main"

View File

@@ -41,16 +41,16 @@ class RandMsg(FastAPI):
db_rand_selected: int = 9
db_rand_selected = random.choice([3])
title_attr: str = "Unknown"
randmsg_db_path: Optional[Union[str, LiteralString]] = None
db_query: Optional[str] = None
match db_rand_selected:
case 0:
randmsg_db_path: Union[str, LiteralString] = os.path.join(
randmsg_db_path = os.path.join(
"/usr/local/share", "sqlite_dbs", "qajoke.db"
) # For qajoke db
db_query: str = (
"SELECT id, ('<b>Q:</b> ' || question || '<br/><b>A:</b> ' \
db_query = "SELECT id, ('<b>Q:</b> ' || question || '<br/><b>A:</b> ' \
|| answer) FROM jokes ORDER BY RANDOM() LIMIT 1" # For qajoke db
)
title_attr = "QA Joke DB"
case 1 | 9:
randmsg_db_path = os.path.join(
@@ -90,9 +90,20 @@ class RandMsg(FastAPI):
WHERE score >= 10000 ORDER BY RANDOM() LIMIT 1"""
title_attr = "r/jokes DB"
if not randmsg_db_path:
return JSONResponse(
content={
"err": True,
}
)
async with sqlite3.connect(database=randmsg_db_path, timeout=1) as _db:
async with await _db.execute(db_query) as _cursor:
result: sqlite3.Row = await _cursor.fetchone()
if not isinstance(_cursor, sqlite3.Cursor):
return JSONResponse(content={"err": True})
result: Optional[sqlite3.Row] = await _cursor.fetchone()
if not result:
return JSONResponse(content={"err": True})
(result_id, result_msg) = result
result_msg = result_msg.strip()
return JSONResponse(

View File

@@ -4,8 +4,27 @@ from fastapi_throttle import RateLimiter
from fastapi.responses import JSONResponse
from utils.sr_wrapper import SRUtil
from auth.deps import get_current_user
from redis import Redis
from rq import Queue, Retry
from rq.job import Job
from rq.job import JobStatus
from rq.registry import (
StartedJobRegistry,
DeferredJobRegistry,
FinishedJobRegistry,
FailedJobRegistry,
ScheduledJobRegistry,
)
from utils.rip_background import bulk_download
from lyric_search.sources import private
from typing import Literal
from pydantic import BaseModel
logging.getLogger().setLevel(logging.INFO)
class ValidBulkFetchRequest(BaseModel):
track_ids: list[int]
target: str
quality: Literal["FLAC", "Lossy"] = "FLAC"
class RIP(FastAPI):
@@ -18,12 +37,28 @@ class RIP(FastAPI):
self.util = my_util
self.trip_util = SRUtil()
self.constants = constants
self.redis_conn = Redis(
host="localhost",
port=6379,
db=0,
password=private.REDIS_PW,
)
self.task_queue = Queue(
"dls",
connection=self.redis_conn,
default_timeout=14400,
default_result_ttl=-1,
default_failure_ttl=86400,
)
self.endpoints: dict = {
"trip/get_artists_by_name": self.artists_by_name_handler,
"trip/get_albums_by_artist_id/{artist_id:path}": self.albums_by_artist_id_handler,
"trip/get_tracks_by_artist_song": self.tracks_by_artist_song_handler,
"trip/get_tracks_by_album_id/{album_id:path}": self.tracks_by_album_id_handler,
"trip/get_track_by_id/{track_id:path}": self.track_by_id_handler,
"trip/bulk_fetch": self.bulk_fetch_handler,
"trip/job/{job_id:path}": self.job_status_handler,
"trip/jobs/list": self.job_list_handler,
}
for endpoint, handler in self.endpoints.items():
@@ -31,11 +66,37 @@ class RIP(FastAPI):
app.add_api_route(
f"/{endpoint}",
handler,
methods=["GET"],
methods=["GET"] if endpoint != "trip/bulk_fetch" else ["POST"],
include_in_schema=True,
dependencies=dependencies,
)
def _format_job(self, job: Job):
"""Helper to normalize job data into JSON."""
job_status: str | JobStatus = job.get_status()
progress = job.meta.get("progress", 0)
if progress == 100 and not job.meta.get("tarball"):
job_status = "Compressing"
tracks_in = job.meta.get("tracks_in")
tracks_out = len(job.meta.get("tracks", []))
return {
"id": job.id,
"status": job_status.title(),
"result": job.result,
"tarball": job.meta.get("tarball"),
"enqueued_at": job.enqueued_at,
"started_at": job.started_at,
"ended_at": job.ended_at,
"progress": progress,
"tracks": f"{tracks_out} / {tracks_in}"
if isinstance(tracks_in, int)
else tracks_out,
"target": job.meta.get("target"),
"quality": job.meta.get("quality", "Unknown"),
}
async def artists_by_name_handler(
self, artist: str, request: Request, user=Depends(get_current_user)
) -> Response:
@@ -55,10 +116,14 @@ class RIP(FastAPI):
return JSONResponse(content=albums)
async def tracks_by_album_id_handler(
self, album_id: int, request: Request, user=Depends(get_current_user)
self,
album_id: int,
request: Request,
user=Depends(get_current_user),
quality: Literal["FLAC", "Lossy"] = "FLAC",
) -> Response:
"""Get tracks by album id"""
tracks = await self.trip_util.get_tracks_by_album_id(album_id)
tracks = await self.trip_util.get_tracks_by_album_id(album_id, quality)
if not tracks:
return Response(status_code=404, content="Not Found")
return JSONResponse(content=tracks)
@@ -74,10 +139,143 @@ class RIP(FastAPI):
return JSONResponse(content=tracks)
async def track_by_id_handler(
self, track_id: int, request: Request, user=Depends(get_current_user)
self,
track_id: int,
request: Request,
quality: Literal["FLAC", "Lossy"] = "FLAC",
user=Depends(get_current_user),
) -> Response:
"""Get track by ID"""
track = await self.trip_util.get_stream_url_by_track_id(track_id)
track = await self.trip_util.get_stream_url_by_track_id(track_id, quality)
if not track:
return Response(status_code=404, content="Not found")
return JSONResponse(content={"stream_url": track})
async def bulk_fetch_handler(
self,
data: ValidBulkFetchRequest,
request: Request,
user=Depends(get_current_user),
) -> Response:
"""Bulk fetch a list of track IDs"""
if not data or not data.track_ids or not data.target:
return JSONResponse(
content={
"err": True,
"errorText": "Invalid data",
}
)
track_ids = data.track_ids
target = data.target
job = self.task_queue.enqueue(
bulk_download,
args=(
track_ids,
data.quality,
),
job_timeout=14400,
failure_ttl=86400,
result_ttl=-1,
retry=Retry(max=1, interval=[30]),
meta={
"progress": 0,
"status": "Queued",
"target": target,
"tracks_in": len(track_ids),
"quality": data.quality,
},
)
self.redis_conn.lpush("enqueued_job_ids", job.id)
return JSONResponse(
content={
"job_id": job.id,
"status": "Queued",
"target": job.meta.get("target", None),
"quality": job.meta.get("quality", "Unknown"),
}
)
async def job_status_handler(
self, job_id: str, request: Request, user=Depends(get_current_user)
):
"""Get status and result of a single job"""
job = None
try:
# Try direct fetch first
job = Job.fetch(job_id, connection=self.redis_conn)
except Exception:
# If not found, try registries explicitly (in case fetch failed because the job left the queue)
registries = [
StartedJobRegistry(queue=self.task_queue),
FinishedJobRegistry(queue=self.task_queue),
FailedJobRegistry(queue=self.task_queue),
DeferredJobRegistry(queue=self.task_queue),
ScheduledJobRegistry(queue=self.task_queue),
]
for registry in registries:
if job_id in registry.get_job_ids():
try:
job = Job.fetch(job_id, connection=self.redis_conn)
except Exception:
pass
break
if job is None:
return JSONResponse({"error": "Job not found"}, status_code=404)
return self._format_job(job)
async def job_list_handler(self, request: Request, user=Depends(get_current_user)):
"""List all jobs across all registries (queued, started, finished, failed, etc)."""
jobs_info = []
seen = set()
# 1. Jobs still waiting in queue
for job in self.task_queue.jobs:
jobs_info.append(self._format_job(job))
seen.add(job.id)
# 2. Jobs in Started/Finished/Failed/Deferred registries
registries = [
StartedJobRegistry(queue=self.task_queue),
FinishedJobRegistry(queue=self.task_queue),
FailedJobRegistry(queue=self.task_queue),
DeferredJobRegistry(queue=self.task_queue),
]
for registry in registries:
for jid in registry.get_job_ids():
if jid in seen:
continue
try:
job = Job.fetch(jid, connection=self.redis_conn)
jobs_info.append(self._format_job(job))
seen.add(job.id)
except Exception:
continue # job might have been cleaned up
# 3. Jobs tracked in your custom enqueued_job_ids list
job_ids = self.redis_conn.lrange("enqueued_job_ids", 0, -1)
for jid_bytes in job_ids: # type: ignore
jid = jid_bytes.decode()
if jid in seen:
continue
try:
job = Job.fetch(jid, connection=self.redis_conn)
jobs_info.append(self._format_job(job))
seen.add(job.id)
except Exception:
continue
# ---- Sort newest first ----
def job_sort_key(job):
return (
job.get("ended_at")
or job.get("started_at")
or job.get("enqueued_at")
or 0
)
jobs_info.sort(key=job_sort_key, reverse=True)
return {"jobs": jobs_info}

View File

@@ -1,9 +1,9 @@
import os
import aiosqlite as sqlite3
from fastapi import FastAPI, Depends
from fastapi import FastAPI, Depends, Response
from fastapi_throttle import RateLimiter
from fastapi.responses import JSONResponse
from typing import Optional, LiteralString, Union
from typing import Optional, LiteralString, Union, Iterable, cast
from .constructors import ValidShowEpisodeLineRequest, ValidShowEpisodeListRequest
@@ -88,7 +88,7 @@ class Transcriptions(FastAPI):
async with sqlite3.connect(database=db_path, timeout=1) as _db:
async with await _db.execute(db_query) as _cursor:
result: list[tuple] = await _cursor.fetchall()
result: Iterable[sqlite3.Row] = await _cursor.fetchall()
return JSONResponse(
content={
"show_title": show_title,
@@ -104,7 +104,7 @@ class Transcriptions(FastAPI):
async def get_episode_lines_handler(
self, data: ValidShowEpisodeLineRequest
) -> JSONResponse:
) -> Response:
"""
Get lines for a particular episode
- **s**: Show ID to query
@@ -138,8 +138,14 @@ class Transcriptions(FastAPI):
async with sqlite3.connect(database=db_path, timeout=1) as _db:
params: tuple = (episode_id,)
async with await _db.execute(db_query, params) as _cursor:
result: list[tuple] = await _cursor.fetchall()
first_result: tuple = result[0]
result: Iterable[sqlite3.Row] = await _cursor.fetchall()
result_list = cast(list[sqlite3.Row], result)
if not result_list:
return Response(
status_code=404,
content="Not found",
)
first_result: sqlite3.Row = result_list[0]
return JSONResponse(
content={
"episode_id": episode_id,

View File

@@ -66,6 +66,8 @@ class Cache:
confidence=row["confidence"],
)
else:
if not sqlite_rows:
return None
for row in sqlite_rows:
if row[0] == matched_id:
(_id, artist, song, lyrics, original_src) = row[:-1]

View File

@@ -45,8 +45,8 @@ class Genius:
Optional[LyricsResult]: The result, if found - None otherwise.
"""
try:
artist: str = artist.strip().lower()
song: str = song.strip().lower()
artist = artist.strip().lower()
song = song.strip().lower()
time_start: float = time.time()
logging.info("Searching %s - %s on %s", artist, song, self.label)
search_term: str = f"{artist}%20{song}"
@@ -56,7 +56,6 @@ class Genius:
f"{self.genius_search_url}{search_term}",
timeout=self.timeout,
headers=self.headers,
verify_ssl=False,
proxy=private.GENIUS_PROXY,
) as request:
request.raise_for_status()
@@ -113,7 +112,6 @@ class Genius:
scrape_url,
timeout=self.timeout,
headers=self.headers,
verify_ssl=False,
proxy=private.GENIUS_PROXY,
) as scrape_request:
scrape_request.raise_for_status()

View File

@@ -26,6 +26,7 @@ class MemeUtil:
bool
"""
# Accepts either bytes or a BytesIO-like object
signature = None
if isinstance(buffer, io.BytesIO):
if hasattr(buffer, "read") and hasattr(buffer, "seek"):
pos = buffer.tell()
@@ -153,6 +154,8 @@ class MemeUtil:
query: str = "SELECT count(id) AS count FROM memes"
async with await db_conn.execute(query) as db_cursor:
result = await db_cursor.fetchone()
if not result:
return None
count = result["count"]
if not isinstance(count, int):
return None

View File

@@ -51,7 +51,7 @@ class RadioUtil:
'main': self.constants.RADIO_DB_QUERY,
'rap': self.constants.RADIO_DB_QUERY_RAP,
'pop': self.constants.RADIO_DB_QUERY_POP,
'classical': self.constants.RADIO_DB_QUERY_CLASSICAL,
# 'classical': self.constants.RADIO_DB_QUERY_CLASSICAL,
'rock': self.constants.RADIO_DB_QUERY_ROCK,
'electronic': self.constants.RADIO_DB_QUERY_ELECTRONIC,
}
@@ -77,7 +77,6 @@ class RadioUtil:
"rock",
"rap",
"electronic",
"classical",
"pop",
]
self.active_playlist: dict[str, list[dict]] = {}
@@ -152,10 +151,10 @@ class RadioUtil:
filter = filter.strip().lower()
matched: list[dict] = []
for item in self.active_playlist[station]:
artist: str = item.get("artist", None)
song: str = item.get("song", None)
artistsong: str = item.get("artistsong", None)
album: str = item.get("album", None)
artist: str = item.get("artist", "")
song: str = item.get("song", "")
artistsong: str = item.get("artistsong", "")
album: str = item.get("album", "")
if not artist or not song or not artistsong:
continue
if non_alnum.sub("", filter) in non_alnum.sub("", artistsong).lower():
@@ -201,6 +200,8 @@ class RadioUtil:
search_song = song
if not artistsong:
artistsong = f"{search_artist} - {search_song}"
if not search_artist or not search_song or not artistsong:
raise RadioException("No query provided")
search_params = (
search_artist.lower(),
search_song.lower(),
@@ -280,6 +281,8 @@ class RadioUtil:
"""
try:
added_rows: int = 0
artist = None
genre = None
with sqlite3.connect(self.artist_genre_db_path, timeout=2) as _db:
for pair in pairs:
try:
@@ -388,11 +391,10 @@ class RadioUtil:
for playlist in self.playlists:
playlist_redis_key: str = f"playlist:{playlist}"
_playlist = await self.redis_client.json().get(playlist_redis_key)
_playlist = await self.redis_client.json().get(playlist_redis_key) # type: ignore
if playlist not in self.active_playlist.keys():
self.active_playlist[playlist] = []
if not playlist == "rock":
random.shuffle(_playlist) # Temp/for Cocteau Twins
random.shuffle(_playlist)
self.active_playlist[playlist] = [
{
"uuid": str(uuid().hex),
@@ -418,7 +420,7 @@ class RadioUtil:
logging.info("Removing duplicate tracks...")
dedupe_processed = []
for item in self.active_playlist[playlist]:
artistsongabc: str = non_alnum.sub("", item.get("artistsong", None))
artistsongabc: str = non_alnum.sub("", item.get("artistsong", ""))
if not artistsongabc:
logging.info("Missing artistsong: %s", item)
continue

401
utils/rip_background.py Normal file
View File

@@ -0,0 +1,401 @@
import logging
import asyncio
import random
import os
import tarfile
import traceback
import uuid
import subprocess
import shutil
import re
from pathlib import Path
from urllib.parse import urlparse, unquote
import aiohttp
from datetime import datetime
from mediafile import MediaFile # type: ignore[import]
from rq import get_current_job
from utils.sr_wrapper import SRUtil
# ---------- Config ----------
ROOT_DIR = Path("/storage/music2")
MAX_RETRIES = 5
THROTTLE_MIN = 1.0
THROTTLE_MAX = 3.5
HEADERS = {
"User-Agent": (
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
"AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/116.0.5845.97 Safari/537.36"
),
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8",
"Accept-Language": "en-US,en;q=0.9",
"Connection": "keep-alive",
}
logging.basicConfig(
level=logging.DEBUG,
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
)
sr = SRUtil()
# ---------- Helpers ----------
def tag_with_mediafile(file_path: str, meta: dict):
f = MediaFile(file_path)
# --- Helper to safely set textual/number fields ---
def safe_set(attr, value, default=None, cast=None):
if value is None:
value = default
if value is not None:
if cast is not None:
setattr(f, attr, cast(value))
else:
setattr(f, attr, str(value))
# --- Basic textual metadata ---
safe_set("title", meta.get("title"), default="Unknown Title")
safe_set("artist", meta.get("artist"), default="Unknown Artist")
safe_set("albumartist", meta.get("album_artist"), default="Unknown Artist")
safe_set("album", meta.get("album"), default="Unknown Album")
safe_set("track", meta.get("track_number"), default=0, cast=int)
safe_set("disc", meta.get("disc_number"), default=0, cast=int)
safe_set("isrc", meta.get("isrc"), default="")
safe_set("bpm", meta.get("bpm"), default=0, cast=int)
# --- Release date ---
release_date_str = meta.get("release_date")
release_date_obj = None
if release_date_str:
try:
release_date_obj = datetime.fromisoformat(release_date_str).date()
except ValueError:
try:
# fallback if only year string
release_date_obj = datetime(int(release_date_str[:4]), 1, 1).date()
except Exception:
pass
if release_date_obj:
f.date = release_date_obj
# --- Save all tags ---
f.save()
def cleanup_empty_dirs(root: Path):
"""
Recursively remove any directories under root that contain no files
(empty or only empty subdirectories).
"""
for dirpath, dirnames, filenames in os.walk(root, topdown=False):
p = Path(dirpath)
has_file = any(f.is_file() for f in p.rglob("*"))
if not has_file:
try:
p.rmdir()
except Exception:
pass
def sanitize_filename(name: str) -> str:
"""Make a string safe for file/dir names."""
if not name:
return "Unknown"
name = name.replace("/", "-").replace("\\", "-")
name = re.sub(r'[<>:"|?*\x00-\x1F]', "", name)
name = name.strip().strip(".")
name = re.sub(r"\s+", " ", name)
return name[:180] or "Unknown"
def ensure_unique_path(p: Path) -> Path:
"""
Ensure the given file or directory path is unique *within its parent folder*.
Only appends (2), (3)... if a real conflict exists in that folder.
"""
parent = p.parent
stem, suffix = p.stem, p.suffix
existing = {f.name for f in parent.glob(f"*{suffix}") if f.is_file()}
candidate = f"{stem}{suffix}"
if candidate not in existing:
return parent / candidate
counter = 2
while True:
candidate = f"{stem} ({counter}){suffix}"
if candidate not in existing:
return parent / candidate
counter += 1
# ---------- Job ----------
def bulk_download(track_list: list, quality: str = "FLAC"):
"""
RQ job:
- fetches stream URLs
- downloads with retries + throttling
- uses SR metadata to name/organize files
- creates ONE tarball for all tracks
- returns [tarball_path]
"""
job = get_current_job()
job_id = job.id if job else uuid.uuid4().hex
staging_root = ROOT_DIR / job_id
if job:
try:
job.meta["track_ids"] = [str(t) for t in (track_list or [])]
job.meta["tracks"] = []
job.meta["progress"] = 0
job.meta["tarball"] = None
job.meta["status"] = "Started"
job.save_meta()
except Exception as e:
logging.warning("Failed to init job.meta: %s", e)
async def process_tracks():
per_track_meta = []
all_final_files = []
all_artists = set()
(ROOT_DIR / "completed").mkdir(parents=True, exist_ok=True)
async with aiohttp.ClientSession(headers=HEADERS) as session:
total = len(track_list or [])
logging.critical("Total tracks to process: %s", total)
if job:
job.meta["progress"] = 0
job.save_meta()
for i, track_id in enumerate(track_list or []):
track_info = {
"track_id": str(track_id),
"status": "Pending",
"file_path": None,
"error": None,
"attempts": 0,
}
attempt = 0
while attempt < MAX_RETRIES:
tmp_file = None
attempt += 1
track_info["attempts"] = attempt
try:
url = await sr.get_stream_url_by_track_id(track_id, quality)
if not url:
raise RuntimeError("No stream URL")
parsed = urlparse(url)
clean_path = unquote(parsed.path)
ext = Path(clean_path).suffix or ".mp3"
tmp_file = Path(f"/tmp/{uuid.uuid4().hex}{ext}")
async with session.get(url) as resp:
resp.raise_for_status()
with open(tmp_file, "wb") as f:
async for chunk in resp.content.iter_chunked(64 * 1024):
f.write(chunk)
md = await sr.get_metadata_by_track_id(track_id) or {}
logging.info("Metadata for %s: %s", track_id, md)
artist_raw = md.get("artist") or "Unknown Artist"
album_raw = md.get("album") or "Unknown Album"
title_raw = md.get("title") or f"Track {track_id}"
artist = sanitize_filename(artist_raw)
album = sanitize_filename(album_raw)
title = sanitize_filename(title_raw)
all_artists.add(artist)
artist_dir = staging_root / artist
album_dir = artist_dir / album
album_dir.mkdir(parents=True, exist_ok=True)
final_file = ensure_unique_path(album_dir / f"{title}{ext}")
tag_with_mediafile(str(tmp_file), md)
tmp_file.rename(final_file)
tmp_file = None
track_info["status"] = "Success"
track_info["file_path"] = str(final_file)
track_info["error"] = None
all_final_files.append(final_file)
if job:
job.meta["progress"] = int(((i + 1) / total) * 100)
job.save_meta()
break
except aiohttp.ClientResponseError as e:
if e.status == 429:
wait_time = min(60, 2**attempt) # exponential up to 60s
logging.warning(
"Rate limited (429). Sleeping %s seconds", wait_time
)
await asyncio.sleep(wait_time)
else:
await asyncio.sleep(
random.uniform(THROTTLE_MIN, THROTTLE_MAX)
)
except Exception as e:
logging.error(
"Track %s attempt %s failed: %s", track_id, attempt, e
)
traceback.print_exc()
track_info["error"] = str(e)
if attempt >= MAX_RETRIES:
track_info["status"] = "Failed"
await asyncio.sleep(random.uniform(THROTTLE_MIN, THROTTLE_MAX))
finally:
try:
if tmp_file and tmp_file.exists():
tmp_file.unlink()
except Exception:
pass
per_track_meta.append(track_info)
if job:
try:
job.meta["tracks"] = per_track_meta
job.save_meta()
except Exception as e:
logging.warning(
"Failed to update job.meta after track %s: %s", track_id, e
)
await asyncio.sleep(random.uniform(THROTTLE_MIN, THROTTLE_MAX))
if not all_final_files:
if job:
try:
job.meta["tarball"] = None
job.meta["status"] = "Failed"
job.save_meta()
except Exception:
pass
return []
artist_counts: dict[str, int] = {}
for t in per_track_meta:
if t["status"] == "Success" and t.get("file_path"):
try:
artist = Path(t["file_path"]).relative_to(staging_root).parts[0]
except Exception:
artist = "Unknown Artist"
artist_counts[artist] = artist_counts.get(artist, 0) + 1
if artist_counts:
top_artist = sorted(artist_counts.items(), key=lambda kv: (-kv[1], kv[0]))[
0
][0]
else:
top_artist = "Unknown Artist"
combined_artist = sanitize_filename(top_artist)
staged_tarball = staging_root / f"{combined_artist}.tar.gz"
# Ensure uniqueness (Windows-style padding) within the parent folder
counter = 1
base_name = staged_tarball.stem
while staged_tarball.exists():
counter += 1
staged_tarball = staging_root / f"{base_name} ({counter}).tar.gz"
final_tarball = ROOT_DIR / "completed" / quality / staged_tarball.name
final_tarball.parent.mkdir(parents=True, exist_ok=True)
if job:
try:
job.meta["status"] = "Compressing"
job.save_meta()
except Exception:
pass
logging.info("Creating tarball: %s", staged_tarball)
def _create_tar_sync():
try:
subprocess.run(
[
"tar",
"-I",
"pigz -9",
"-cf",
str(staged_tarball),
"-C",
str(staging_root),
]
+ [str(f.relative_to(staging_root)) for f in all_final_files],
check=True,
)
for f in all_final_files:
try:
os.remove(f)
except Exception:
pass
except FileNotFoundError:
logging.warning("pigz not available, falling back to tarfile (slower).")
with tarfile.open(staged_tarball, "w:gz") as tar:
for f in all_final_files:
try:
arcname = f.relative_to(staging_root)
except ValueError:
arcname = f.name
tar.add(f, arcname=str(arcname))
try:
os.remove(f)
except Exception:
pass
await asyncio.to_thread(_create_tar_sync)
if not staged_tarball.exists():
logging.error("Tarball was not created: %s", staged_tarball)
if job:
try:
job.meta["status"] = "compress_failed"
job.save_meta()
except Exception:
pass
return []
logging.critical("Tarball created: %s", staged_tarball)
try:
staged_tarball.rename(final_tarball)
except Exception:
shutil.move(str(staged_tarball), str(final_tarball))
logging.critical("Tarball finalized: %s", final_tarball)
await asyncio.to_thread(shutil.rmtree, staging_root, ignore_errors=True)
if job:
job.meta["tarball"] = str(final_tarball)
job.meta["progress"] = 100
job.meta["status"] = "Completed"
job.save_meta()
return [str(final_tarball)]
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
return loop.run_until_complete(process_tracks())
except Exception as e:
if job:
job.meta["status"] = "Failed"
job.save_meta()
logging.critical("Exception: %s", str(e))
finally:
loop.close()

View File

@@ -1,10 +1,18 @@
from typing import Optional
from typing import Optional, Any
from uuid import uuid4
from urllib.parse import urlparse
import hashlib
import logging
import random
import asyncio
import os
from streamrip.client import TidalClient
from streamrip.config import Config as StreamripConfig
import aiohttp
import time
from streamrip.client import TidalClient # type: ignore
from streamrip.config import Config as StreamripConfig # type: ignore
from dotenv import load_dotenv
load_dotenv()
@@ -38,6 +46,23 @@ class SRUtil:
)
self.streamrip_config
self.streamrip_client = TidalClient(self.streamrip_config)
self.MAX_CONCURRENT_METADATA_REQUESTS = 2
self.METADATA_RATE_LIMIT = 1.25
self.METADATA_SEMAPHORE = asyncio.Semaphore(self.MAX_CONCURRENT_METADATA_REQUESTS)
self.LAST_METADATA_REQUEST = 0
self.MAX_METADATA_RETRIES = 5
self.METADATA_ALBUM_CACHE: dict[str, dict] = {}
self.RETRY_DELAY = 1.0 # seconds between retries
async def rate_limited_request(self, func, *args, **kwargs):
async with self.METADATA_SEMAPHORE:
now = time.time()
elapsed = now - self.LAST_METADATA_REQUEST
if elapsed < self.METADATA_RATE_LIMIT:
await asyncio.sleep(self.METADATA_RATE_LIMIT - elapsed)
result = await func(*args, **kwargs)
self.last_request_time = time.time()
return result
def dedupe_by_key(self, key: str, entries: list[dict]) -> list[dict]:
deduped = {}
@@ -53,16 +78,80 @@ class SRUtil:
m, s = divmod(seconds, 60)
return f"{m}:{s:02}"
def combine_album_track_metadata(
self, album_json: dict | None, track_json: dict
) -> dict:
"""
Combine album-level and track-level metadata into a unified tag dictionary.
Track-level metadata overrides album-level where relevant.
"""
album_json = album_json or {}
# Album-level
combined = {
"album": album_json.get("title"),
"album_artist": album_json.get("artist", {}).get("name"),
"release_date": album_json.get("releaseDate"),
"album_type": album_json.get("type"),
"total_tracks": album_json.get("numberOfTracks"),
"upc": album_json.get("upc"),
"album_copyright": album_json.get("copyright"),
"album_cover_id": album_json.get("cover"),
"album_cover_url": f"https://resources.tidal.com/images/{album_json.get('cover')}/1280x1280.jpg"
if album_json.get("cover")
else None,
}
# Track-level (overrides or adds to album info)
combined.update(
{
"title": track_json.get("title"),
"artist": track_json.get("artist", {}).get("name"),
"artists": [a.get("name") for a in track_json.get("artists", [])],
"track_number": track_json.get("trackNumber"),
"disc_number": track_json.get("volumeNumber"),
"duration": track_json.get("duration"),
"isrc": track_json.get("isrc"),
"bpm": track_json.get("bpm"),
"explicit": track_json.get("explicit"),
"replaygain": track_json.get("replayGain"),
"peak": track_json.get("peak"),
"lyrics": track_json.get("lyrics"),
"track_copyright": track_json.get("copyright"),
"cover_id": track_json.get("album", {}).get("cover") or album_json.get("cover"),
"cover_url": (
f"https://resources.tidal.com/images/{track_json.get('album', {}).get('cover', album_json.get('cover'))}/1280x1280.jpg"
if (track_json.get("album", {}).get("cover") or album_json.get("cover"))
else None
),
}
)
return combined
def combine_album_with_all_tracks(
self, album_json: dict[str, Any]
) -> list[dict[str, Any]]:
"""Return a list of combined metadata dicts for all tracks in an album JSON."""
return [
self.combine_album_track_metadata(album_json, t)
for t in album_json.get("tracks", [])
]
async def get_artists_by_name(self, artist_name: str) -> Optional[list]:
"""Get artist(s) by name from HiFi API.
"""Get artist(s) by name.
Args:
artist_name (str): The name of the artist.
Returns:
Optional[dict]: The artist details or None if not found.
"""
if not self.streamrip_client.logged_in:
try:
await self.streamrip_client.login()
except Exception as e:
logging.info("Login Exception: %s", str(e))
pass
artists_out: list[dict] = []
try:
artists = await self.streamrip_client.search(
@@ -73,6 +162,7 @@ class SRUtil:
artists = await self.streamrip_client.search(
media_type="artist", query=artist_name
)
logging.critical("Artists output: %s", artists)
artists = artists[0].get("items", [])
if not artists:
logging.warning("No artist found for name: %s", artist_name)
@@ -89,7 +179,7 @@ class SRUtil:
return artists_out
async def get_albums_by_artist_id(self, artist_id: int) -> Optional[list | dict]:
"""Get albums by artist ID from HiFi API.
"""Get albums by artist ID
Args:
artist_id (int): The ID of the artist.
Returns:
@@ -98,7 +188,6 @@ class SRUtil:
artist_id_str: str = str(artist_id)
albums_out: list[dict] = []
try:
if not self.streamrip_client.logged_in:
await self.streamrip_client.login()
metadata = await self.streamrip_client.get_metadata(
item_id=artist_id_str, media_type="artist"
@@ -111,7 +200,7 @@ class SRUtil:
if not metadata:
logging.warning("No metadata found for artist ID: %s", artist_id)
return None
albums = metadata.get("albums", [])
albums = self.dedupe_by_key("title", metadata.get("albums", []))
albums_out = [
{
"artist": ", ".join(artist["name"] for artist in album["artists"]),
@@ -123,18 +212,19 @@ class SRUtil:
if "title" in album and "id" in album and "artists" in album
]
logging.info("Retrieved albums: %s", albums_out)
logging.debug("Retrieved albums: %s", albums_out)
return albums_out
async def get_tracks_by_album_id(self, album_id: int) -> Optional[list | dict]:
"""Get tracks by album ID from HiFi API.
async def get_tracks_by_album_id(
self, album_id: int, quality: str = "FLAC"
) -> Optional[list | dict]:
"""Get tracks by album ID
Args:
album_id (int): The ID of the album.
Returns:
Optional[list[dict]]: List of tracks or None if not found.
"""
album_id_str = str(album_id)
if not self.streamrip_client.logged_in:
await self.streamrip_client.login()
metadata = await self.streamrip_client.get_metadata(
item_id=album_id_str, media_type="album"
@@ -158,7 +248,7 @@ class SRUtil:
return tracks_out
async def get_tracks_by_artist_song(self, artist: str, song: str) -> Optional[list]:
"""Get track by artist and song name from HiFi API.
"""Get track by artist and song name
Args:
artist (str): The name of the artist.
song (str): The name of the song.
@@ -169,18 +259,37 @@ class SRUtil:
return []
async def get_stream_url_by_track_id(
self, track_id: int, quality: str = "LOSSLESS"
self, track_id: int, quality: str = "FLAC"
) -> Optional[str]:
"""Get stream URL by track ID from HiFi API.
"""Get stream URL by track ID
Args:
track_id (int): The ID of the track.
quality (str): The quality of the stream, default is "LOSSLESS". Other options: HIGH, LOW
Returns:
Optional[str]: The stream URL or None if not found.
"""
track_id_str = str(track_id)
if quality not in ["FLAC", "Lossy"]:
logging.error("Invalid quality requested: %s", quality)
return None
quality_int: int = int(self.streamrip_config.session.tidal.quality)
match quality:
case "FLAC":
quality_int = 2
case "Lossy":
quality_int = 1
track_id_str: str = str(track_id)
await self.streamrip_client.login()
try:
logging.critical("Using quality_int: %s", quality_int)
track = await self.streamrip_client.get_downloadable(
track_id=track_id_str, quality=self.streamrip_config.session.tidal.quality
track_id=track_id_str, quality=quality_int
)
except AttributeError:
await self.streamrip_client.login()
track = await self.streamrip_client.get_downloadable(
track_id=track_id_str, quality=quality_int
)
if not track:
logging.warning("No track found for ID: %s", track_id)
@@ -190,3 +299,103 @@ class SRUtil:
logging.warning("No stream URL found for track ID: %s", track_id)
return None
return stream_url
async def get_metadata_by_track_id(self, track_id: int) -> Optional[dict]:
"""
Fetch track + album metadata with retries, caching album data.
Returns combined metadata dict or None after exhausting retries.
"""
for attempt in range(1, self.MAX_METADATA_RETRIES + 1):
try:
await self.streamrip_client.login()
# Track metadata
metadata = await self.rate_limited_request(
self.streamrip_client.get_metadata, str(track_id), "track"
)
album_id = metadata.get("album", {}).get("id")
album_metadata = None
if album_id:
# Check cache first
if album_id in self.METADATA_ALBUM_CACHE:
album_metadata = self.METADATA_ALBUM_CACHE[album_id]
else:
album_metadata = await self.rate_limited_request(
self.streamrip_client.get_metadata, album_id, "album"
)
if not album_metadata:
return None
self.METADATA_ALBUM_CACHE[album_id] = album_metadata
# Combine track + album metadata
if not album_metadata:
return None
combined_metadata: dict = self.combine_album_track_metadata(
album_metadata, metadata
)
logging.info(
"Combined metadata for track ID %s (attempt %d): %s",
track_id,
attempt,
combined_metadata,
)
return combined_metadata
except Exception as e:
# Exponential backoff with jitter for 429 or other errors
delay = self.RETRY_DELAY * (2 ** (attempt - 1)) + random.uniform(0, 0.5)
logging.warning(
"Metadata fetch failed for track %s (attempt %d/%d): %s. Retrying in %.2fs",
track_id,
attempt,
self.MAX_METADATA_RETRIES,
str(e),
delay,
)
if attempt < self.MAX_METADATA_RETRIES:
await asyncio.sleep(delay)
else:
logging.error(
"Metadata fetch failed permanently for track %s after %d attempts",
track_id,
self.MAX_METADATA_RETRIES,
)
return None
async def download(self, track_id: int, quality: str = "LOSSLESS") -> bool | str:
"""Download track
Args:
track_id (int)
quality (str): The quality of the stream, default is "LOSSLESS". Other options: HIGH, LOW
Returns:
bool
"""
try:
await self.streamrip_client.login()
track_url = await self.get_stream_url_by_track_id(track_id)
if not track_url:
return False
parsed_url = urlparse(track_url)
parsed_url_filename = os.path.basename(parsed_url.path)
parsed_url_ext = os.path.splitext(parsed_url_filename)[1]
unique = hashlib.sha256(uuid4().bytes).hexdigest()[:16]
dl_folder_path = (
f"{self.streamrip_config.session.downloads.folder}/{unique}"
)
dl_path = f"{dl_folder_path}/{track_id}.{parsed_url_ext}"
async with aiohttp.ClientSession() as session:
async with session.get(
track_url, headers={}, timeout=aiohttp.ClientTimeout(total=60)
) as resp:
resp.raise_for_status()
with open(dl_path, "wb") as f:
async for chunk in resp.content.iter_chunked(1024 * 64):
f.write(chunk)
return dl_path
except Exception as e:
logging.critical("Error: %s", str(e))
return False

35
utils/test.conf Normal file
View File

@@ -0,0 +1,35 @@
# -----------------------
# /m/m2/ PHP handler
location ~ ^/m/m2/(.+\.php)$ {
alias /storage/music2/completed/;
include fastcgi_params;
fastcgi_pass unix:/run/php/php8.2-fpm.sock;
fastcgi_param SCRIPT_FILENAME /storage/music2/completed/$1;
fastcgi_param DOCUMENT_ROOT /storage/music2/completed;
fastcgi_param SCRIPT_NAME /m/m2/$1;
}
# /m/m2/ static files
location /m/m2/ {
alias /storage/music2/completed/;
index index.php;
try_files $uri $uri/ /index.php$is_args$args;
}
# -----------------------
# /m/ PHP handler
location ~ ^/m/(.+\.php)$ {
root /var/www/codey.lol/new/public;
include fastcgi_params;
fastcgi_pass unix:/run/php/php8.2-fpm.sock;
fastcgi_param SCRIPT_FILENAME $document_root/$1;
fastcgi_param DOCUMENT_ROOT $document_root;
fastcgi_param SCRIPT_NAME /m/$1;
}
# /m/ static files
location /m/ {
root /var/www/codey.lol/new/public;
index index.php;
try_files $uri $uri/ /m/index.php$is_args$args;
}