Compare commits

..

17 Commits

Author SHA1 Message Date
3b74333b96 misc 2025-09-12 22:39:59 -04:00
f6d4ed57f3 misc 2025-09-09 15:50:13 -04:00
a57173b90a TRip: change file naming, use pigz for faster tarball creation 2025-08-29 10:23:06 -04:00
a11748775e TRip: capitalize RQ job statuses in related endpoints, order job list, other: minor/typing 2025-08-23 08:20:32 -04:00
a8d089c0fe minor 2025-08-21 15:35:10 -04:00
dd8d07b2f0 formatting 2025-08-21 15:08:13 -04:00
22eaa2260e another commit without a list of specific changes! (misc) 2025-08-21 15:06:56 -04:00
e0f64f6773 misc 2025-08-20 15:58:07 -04:00
81f79dea1e misc 2025-08-20 07:32:57 -04:00
3cebe14674 misc / TRip: folder structure / tar naming 2025-08-15 14:58:06 -04:00
27fa1f78ed misc 2025-08-15 14:15:13 -04:00
0cd4a71db2 formatting / RQ tuning 2025-08-15 13:39:27 -04:00
93050ec6cf misc / RQ bulk downloads for TRip 2025-08-15 13:31:15 -04:00
72a7734152 minor 2025-08-11 15:06:58 -04:00
4cbd0fb934 docstrings 2025-08-11 14:06:42 -04:00
0fc78b08e4 oops 2025-08-11 14:05:20 -04:00
f1401ee6bf oops 2025-08-11 14:04:22 -04:00
17 changed files with 1176 additions and 277 deletions

3
.gitignore vendored
View File

@@ -25,5 +25,8 @@ endpoints/auth.py
endpoints/radio2 endpoints/radio2
endpoints/radio2/** endpoints/radio2/**
hash_password.py hash_password.py
up.py
job_review.py
check_missing.py
**/auth/* **/auth/*
.gitignore .gitignore

View File

@@ -9,9 +9,10 @@ from fastapi import FastAPI, Request
from fastapi.middleware.cors import CORSMiddleware from fastapi.middleware.cors import CORSMiddleware
from lyric_search.sources import redis_cache from lyric_search.sources import redis_cache
logging.basicConfig(level=logging.INFO)
logging.getLogger("aiosqlite").setLevel(logging.WARNING)
logging.getLogger("httpx").setLevel(logging.WARNING)
logger = logging.getLogger() logger = logging.getLogger()
logger.setLevel(logging.INFO)
loop = asyncio.get_event_loop() loop = asyncio.get_event_loop()
app = FastAPI( app = FastAPI(
@@ -22,7 +23,6 @@ app = FastAPI(
loop=loop, loop=loop,
) )
constants = importlib.import_module("constants").Constants() constants = importlib.import_module("constants").Constants()
util = importlib.import_module("util").Utilities(app, constants) util = importlib.import_module("util").Utilities(app, constants)

View File

@@ -1,6 +1,9 @@
from typing import Optional from typing import Optional
from typing import Literal
from pydantic import BaseModel from pydantic import BaseModel
Station = Literal["main", "rock", "rap", "electronic", "pop"]
""" """
LastFM LastFM
""" """
@@ -96,25 +99,6 @@ class ValidYTSearchRequest(BaseModel):
t: str = "rick astley - never gonna give you up" t: str = "rick astley - never gonna give you up"
"""
XC
"""
class ValidXCRequest(BaseModel):
"""
- **key**: valid XC API key
- **bid**: bot id
- **cmd**: bot command
- **data**: command data
"""
key: str
bid: int
cmd: str
data: Optional[dict]
""" """
Transcriptions Transcriptions
""" """
@@ -211,7 +195,7 @@ class ValidRadioSongRequest(BaseModel):
song: Optional[str] = None song: Optional[str] = None
artistsong: Optional[str] = None artistsong: Optional[str] = None
alsoSkip: Optional[bool] = False alsoSkip: Optional[bool] = False
station: str = "main" station: Station = "main"
class ValidRadioTypeaheadRequest(BaseModel): class ValidRadioTypeaheadRequest(BaseModel):
@@ -241,7 +225,7 @@ class ValidRadioNextRequest(BaseModel):
key: str key: str
skipTo: Optional[str] = None skipTo: Optional[str] = None
station: str = "main" station: Station = "main"
class ValidRadioReshuffleRequest(ValidRadioNextRequest): class ValidRadioReshuffleRequest(ValidRadioNextRequest):
@@ -262,7 +246,7 @@ class ValidRadioQueueRequest(BaseModel):
draw: Optional[int] = 1 draw: Optional[int] = 1
start: Optional[int] = 0 start: Optional[int] = 0
search: Optional[str] = None search: Optional[str] = None
station: str = "main" station: Station = "main"
class ValidRadioQueueShiftRequest(BaseModel): class ValidRadioQueueShiftRequest(BaseModel):
@@ -276,7 +260,7 @@ class ValidRadioQueueShiftRequest(BaseModel):
key: str key: str
uuid: str uuid: str
next: Optional[bool] = False next: Optional[bool] = False
station: str = "main" station: Station = "main"
class ValidRadioQueueRemovalRequest(BaseModel): class ValidRadioQueueRemovalRequest(BaseModel):
@@ -288,4 +272,4 @@ class ValidRadioQueueRemovalRequest(BaseModel):
key: str key: str
uuid: str uuid: str
station: str = "main" station: Station = "main"

View File

@@ -30,7 +30,7 @@ class Meme(FastAPI):
f"/{endpoint}", f"/{endpoint}",
handler, handler,
methods=["GET"], methods=["GET"],
include_in_schema=True, include_in_schema=False,
dependencies=dependencies, dependencies=dependencies,
) )

View File

@@ -3,11 +3,18 @@ import time
import os import os
import json import json
import random import random
from typing import Optional, Annotated from typing import Any, Optional, Annotated
from fastapi import FastAPI, Request, UploadFile, Response, HTTPException, Form, Depends from fastapi import FastAPI, Request, UploadFile, Response, HTTPException, Form, Depends
from fastapi_throttle import RateLimiter from fastapi_throttle import RateLimiter
from fastapi.responses import JSONResponse from fastapi.responses import JSONResponse
import redis.asyncio as redis import redis
from rq import Queue
from rq.registry import (
StartedJobRegistry,
FinishedJobRegistry,
FailedJobRegistry,
DeferredJobRegistry,
)
from lyric_search.sources import private, cache as LyricsCache, redis_cache from lyric_search.sources import private, cache as LyricsCache, redis_cache
@@ -22,7 +29,7 @@ class Misc(FastAPI):
self.constants = constants self.constants = constants
self.lyr_cache = LyricsCache.Cache() self.lyr_cache = LyricsCache.Cache()
self.redis_cache = redis_cache.RedisCache() self.redis_cache = redis_cache.RedisCache()
self.redis_client = redis.Redis(password=private.REDIS_PW) self.redis_client: Any = redis.Redis(password=private.REDIS_PW)
self.radio = radio self.radio = radio
self.activity_image: Optional[bytes] = None self.activity_image: Optional[bytes] = None
self.nos_json_path: str = os.path.join( self.nos_json_path: str = os.path.join(
@@ -35,6 +42,7 @@ class Misc(FastAPI):
"widget/sqlite": self.homepage_sqlite_widget, "widget/sqlite": self.homepage_sqlite_widget,
"widget/lyrics": self.homepage_lyrics_widget, "widget/lyrics": self.homepage_lyrics_widget,
"widget/radio": self.homepage_radio_widget, "widget/radio": self.homepage_radio_widget,
"widget/rq": self.homepage_rq_widget,
"misc/get_activity_image": self.get_activity_image, "misc/get_activity_image": self.get_activity_image,
"misc/no": self.no, "misc/no": self.no,
} }
@@ -141,14 +149,14 @@ class Misc(FastAPI):
""" """
# Measure response time w/ test lyric search # Measure response time w/ test lyric search
time_start: float = time.time() # Start time for response_time time_start: float = time.time() # Start time for response_time
test_lyrics_result = await self.redis_client.ft().search( # noqa: F841 test_lyrics_result = self.redis_client.ft().search( # noqa: F841
"@artist: test @song: test" "@artist: test @song: test"
) )
time_end: float = time.time() time_end: float = time.time()
# End response time test # End response time test
total_keys = await self.redis_client.dbsize() total_keys = self.redis_client.dbsize()
response_time: float = time_end - time_start response_time: float = time_end - time_start
index_info = await self.redis_client.ft().info() index_info = self.redis_client.ft().info()
indexed_lyrics: int = index_info.get("num_docs") indexed_lyrics: int = index_info.get("num_docs")
return JSONResponse( return JSONResponse(
content={ content={
@@ -159,6 +167,30 @@ class Misc(FastAPI):
} }
) )
async def homepage_rq_widget(self) -> JSONResponse:
"""
Homepage RQ Widget Handler
"""
queue_name = "dls"
queue = Queue(queue_name, self.redis_client)
queued = queue.count
started = StartedJobRegistry(queue_name, connection=self.redis_client).count
failed = FailedJobRegistry(queue_name, connection=self.redis_client).count
finished = FinishedJobRegistry(queue_name, connection=self.redis_client).count
deferred = DeferredJobRegistry(queue_name, connection=self.redis_client).count
return JSONResponse(
content={
queue_name: {
"queued": queued,
"started": started,
"failed": failed,
"finished": finished,
"deferred": deferred,
}
}
)
async def homepage_sqlite_widget(self) -> JSONResponse: async def homepage_sqlite_widget(self) -> JSONResponse:
""" """
Homepage SQLite Widget Handler Homepage SQLite Widget Handler

View File

@@ -10,6 +10,7 @@ from .constructors import (
ValidRadioSongRequest, ValidRadioSongRequest,
ValidRadioTypeaheadRequest, ValidRadioTypeaheadRequest,
ValidRadioQueueRequest, ValidRadioQueueRequest,
Station
) )
from utils import radio_util from utils import radio_util
from typing import Optional from typing import Optional
@@ -43,23 +44,19 @@ class Radio(FastAPI):
"radio/reshuffle": self.radio_reshuffle, "radio/reshuffle": self.radio_reshuffle,
"radio/queue_remove": self.radio_queue_remove, "radio/queue_remove": self.radio_queue_remove,
"radio/ls._next_": self.radio_get_next, "radio/ls._next_": self.radio_get_next,
"radio/album_art": self.album_art_handler,
} }
for endpoint, handler in self.endpoints.items(): for endpoint, handler in self.endpoints.items():
methods: list[str] = ["POST"]
if endpoint == "radio/album_art":
methods = ["GET"]
app.add_api_route( app.add_api_route(
f"/{endpoint}", handler, methods=["POST"], include_in_schema=True, f"/{endpoint}", handler, methods=methods, include_in_schema=False,
dependencies=[Depends( dependencies=[Depends(
RateLimiter(times=10, seconds=2))] if not endpoint == "radio/np" else None, RateLimiter(times=10, seconds=2))] if not endpoint == "radio/np" else None,
) )
# NOTE: Not in loop because method is GET for this endpoint
app.add_api_route(
"/radio/album_art",
self.album_art_handler,
methods=["GET"],
include_in_schema=True,
)
app.add_event_handler("startup", self.on_start) app.add_event_handler("startup", self.on_start)
async def on_start(self) -> None: async def on_start(self) -> None:
@@ -135,21 +132,35 @@ class Radio(FastAPI):
""" """
Get current play queue (paged, 20 results per page) Get current play queue (paged, 20 results per page)
""" """
if not (data and data.station):
return JSONResponse(status_code=500,
content={
"err": True,
"errorText": "Invalid request.",
})
search: Optional[str] = None search: Optional[str] = None
draw: int = 0 draw: int = 0
if isinstance(data, ValidRadioQueueRequest): if isinstance(data, ValidRadioQueueRequest):
search = data.search search = data.search
draw = data.draw draw = data.draw or 0
start: int = int(data.start) start: int = int(data.start or 0)
end: int = start + 20 end: int = start + 20
else: else:
start: int = 0 start: int = 0
end: int = 20 end: int = 20
orig_queue: list[dict] = self.radio_util.active_playlist[data.station] orig_queue: list[dict] = self.radio_util.active_playlist[data.station]
if not search: if not search:
queue_full: list = orig_queue queue_full: Optional[list] = orig_queue
else: else:
queue_full: list = self.radio_util.datatables_search(data.search, data.station) queue_full = self.radio_util.datatables_search(search, data.station)
if not queue_full:
return JSONResponse(
status_code=500,
content={
"err": True,
"errorText": "No queue found.",
}
)
queue: list = queue_full[start:end] queue: list = queue_full[start:end]
queue_out: list[dict] = [] queue_out: list[dict] = []
for x, item in enumerate(queue): for x, item in enumerate(queue):
@@ -240,7 +251,7 @@ class Radio(FastAPI):
async def album_art_handler( async def album_art_handler(
self, request: Request, track_id: Optional[int] = None, self, request: Request, track_id: Optional[int] = None,
station: Optional[str] = "main" station: Station = "main"
) -> Response: ) -> Response:
""" """
Get album art, optional parameter track_id may be specified. Get album art, optional parameter track_id may be specified.
@@ -251,6 +262,13 @@ class Radio(FastAPI):
try: try:
if not track_id: if not track_id:
track_id = self.radio_util.now_playing[station].get("id") track_id = self.radio_util.now_playing[station].get("id")
if not track_id:
# Still no track ID
return JSONResponse(status_code=500,
content={
"err": True,
"errorText": "Invalid request",
})
logging.debug("Seeking album art with trackId: %s", track_id) logging.debug("Seeking album art with trackId: %s", track_id)
album_art: Optional[bytes] = self.radio_util.get_album_art( album_art: Optional[bytes] = self.radio_util.get_album_art(
track_id=track_id track_id=track_id
@@ -269,7 +287,7 @@ class Radio(FastAPI):
) )
async def radio_now_playing(self, request: Request, async def radio_now_playing(self, request: Request,
station: Optional[str] = "main") -> JSONResponse: station: Station = "main") -> JSONResponse:
""" """
Get currently playing track info Get currently playing track info
- **station**: default "main" - **station**: default "main"

View File

@@ -41,16 +41,16 @@ class RandMsg(FastAPI):
db_rand_selected: int = 9 db_rand_selected: int = 9
db_rand_selected = random.choice([3]) db_rand_selected = random.choice([3])
title_attr: str = "Unknown" title_attr: str = "Unknown"
randmsg_db_path: Optional[Union[str, LiteralString]] = None
db_query: Optional[str] = None
match db_rand_selected: match db_rand_selected:
case 0: case 0:
randmsg_db_path: Union[str, LiteralString] = os.path.join( randmsg_db_path = os.path.join(
"/usr/local/share", "sqlite_dbs", "qajoke.db" "/usr/local/share", "sqlite_dbs", "qajoke.db"
) # For qajoke db ) # For qajoke db
db_query: str = ( db_query = "SELECT id, ('<b>Q:</b> ' || question || '<br/><b>A:</b> ' \
"SELECT id, ('<b>Q:</b> ' || question || '<br/><b>A:</b> ' \
|| answer) FROM jokes ORDER BY RANDOM() LIMIT 1" # For qajoke db || answer) FROM jokes ORDER BY RANDOM() LIMIT 1" # For qajoke db
)
title_attr = "QA Joke DB" title_attr = "QA Joke DB"
case 1 | 9: case 1 | 9:
randmsg_db_path = os.path.join( randmsg_db_path = os.path.join(
@@ -90,9 +90,20 @@ class RandMsg(FastAPI):
WHERE score >= 10000 ORDER BY RANDOM() LIMIT 1""" WHERE score >= 10000 ORDER BY RANDOM() LIMIT 1"""
title_attr = "r/jokes DB" title_attr = "r/jokes DB"
if not randmsg_db_path:
return JSONResponse(
content={
"err": True,
}
)
async with sqlite3.connect(database=randmsg_db_path, timeout=1) as _db: async with sqlite3.connect(database=randmsg_db_path, timeout=1) as _db:
async with await _db.execute(db_query) as _cursor: async with await _db.execute(db_query) as _cursor:
result: sqlite3.Row = await _cursor.fetchone() if not isinstance(_cursor, sqlite3.Cursor):
return JSONResponse(content={"err": True})
result: Optional[sqlite3.Row] = await _cursor.fetchone()
if not result:
return JSONResponse(content={"err": True})
(result_id, result_msg) = result (result_id, result_msg) = result
result_msg = result_msg.strip() result_msg = result_msg.strip()
return JSONResponse( return JSONResponse(

View File

@@ -2,10 +2,29 @@ import logging
from fastapi import FastAPI, Request, Response, Depends from fastapi import FastAPI, Request, Response, Depends
from fastapi_throttle import RateLimiter from fastapi_throttle import RateLimiter
from fastapi.responses import JSONResponse from fastapi.responses import JSONResponse
from utils.hifi_wrapper import HifiUtil from utils.sr_wrapper import SRUtil
from auth.deps import get_current_user from auth.deps import get_current_user
from redis import Redis
from rq import Queue, Retry
from rq.job import Job
from rq.job import JobStatus
from rq.registry import (
StartedJobRegistry,
DeferredJobRegistry,
FinishedJobRegistry,
FailedJobRegistry,
ScheduledJobRegistry,
)
from utils.rip_background import bulk_download
from lyric_search.sources import private
from typing import Literal
from pydantic import BaseModel
logging.getLogger().setLevel(logging.INFO)
class ValidBulkFetchRequest(BaseModel):
track_ids: list[int]
target: str
quality: Literal["FLAC", "Lossy"] = "FLAC"
class RIP(FastAPI): class RIP(FastAPI):
@@ -16,14 +35,30 @@ class RIP(FastAPI):
def __init__(self, app: FastAPI, my_util, constants) -> None: def __init__(self, app: FastAPI, my_util, constants) -> None:
self.app: FastAPI = app self.app: FastAPI = app
self.util = my_util self.util = my_util
self.trip_util = HifiUtil() self.trip_util = SRUtil()
self.constants = constants self.constants = constants
self.redis_conn = Redis(
host="localhost",
port=6379,
db=0,
password=private.REDIS_PW,
)
self.task_queue = Queue(
"dls",
connection=self.redis_conn,
default_timeout=14400,
default_result_ttl=-1,
default_failure_ttl=86400,
)
self.endpoints: dict = { self.endpoints: dict = {
"trip/get_artists_by_name": self.artists_by_name_handler, "trip/get_artists_by_name": self.artists_by_name_handler,
"trip/get_albums_by_artist_id/{artist_id:path}": self.albums_by_artist_id_handler, "trip/get_albums_by_artist_id/{artist_id:path}": self.albums_by_artist_id_handler,
"trip/get_tracks_by_artist_song": self.tracks_by_artist_song_handler, "trip/get_tracks_by_artist_song": self.tracks_by_artist_song_handler,
"trip/get_tracks_by_album_id/{album_id:path}": self.tracks_by_album_id_handler, "trip/get_tracks_by_album_id/{album_id:path}": self.tracks_by_album_id_handler,
"trip/get_track_by_id/{track_id:path}": self.track_by_id_handler, "trip/get_track_by_id/{track_id:path}": self.track_by_id_handler,
"trip/bulk_fetch": self.bulk_fetch_handler,
"trip/job/{job_id:path}": self.job_status_handler,
"trip/jobs/list": self.job_list_handler,
} }
for endpoint, handler in self.endpoints.items(): for endpoint, handler in self.endpoints.items():
@@ -31,11 +66,37 @@ class RIP(FastAPI):
app.add_api_route( app.add_api_route(
f"/{endpoint}", f"/{endpoint}",
handler, handler,
methods=["GET"], methods=["GET"] if endpoint != "trip/bulk_fetch" else ["POST"],
include_in_schema=True, include_in_schema=True,
dependencies=dependencies, dependencies=dependencies,
) )
def _format_job(self, job: Job):
"""Helper to normalize job data into JSON."""
job_status: str | JobStatus = job.get_status()
progress = job.meta.get("progress", 0)
if progress == 100 and not job.meta.get("tarball"):
job_status = "Compressing"
tracks_in = job.meta.get("tracks_in")
tracks_out = len(job.meta.get("tracks", []))
return {
"id": job.id,
"status": job_status.title(),
"result": job.result,
"tarball": job.meta.get("tarball"),
"enqueued_at": job.enqueued_at,
"started_at": job.started_at,
"ended_at": job.ended_at,
"progress": progress,
"tracks": f"{tracks_out} / {tracks_in}"
if isinstance(tracks_in, int)
else tracks_out,
"target": job.meta.get("target"),
"quality": job.meta.get("quality", "Unknown"),
}
async def artists_by_name_handler( async def artists_by_name_handler(
self, artist: str, request: Request, user=Depends(get_current_user) self, artist: str, request: Request, user=Depends(get_current_user)
) -> Response: ) -> Response:
@@ -55,10 +116,14 @@ class RIP(FastAPI):
return JSONResponse(content=albums) return JSONResponse(content=albums)
async def tracks_by_album_id_handler( async def tracks_by_album_id_handler(
self, album_id: int, request: Request, user=Depends(get_current_user) self,
album_id: int,
request: Request,
user=Depends(get_current_user),
quality: Literal["FLAC", "Lossy"] = "FLAC",
) -> Response: ) -> Response:
"""Get tracks by album id""" """Get tracks by album id"""
tracks = await self.trip_util.get_tracks_by_album_id(album_id) tracks = await self.trip_util.get_tracks_by_album_id(album_id, quality)
if not tracks: if not tracks:
return Response(status_code=404, content="Not Found") return Response(status_code=404, content="Not Found")
return JSONResponse(content=tracks) return JSONResponse(content=tracks)
@@ -74,10 +139,143 @@ class RIP(FastAPI):
return JSONResponse(content=tracks) return JSONResponse(content=tracks)
async def track_by_id_handler( async def track_by_id_handler(
self, track_id: int, request: Request, user=Depends(get_current_user) self,
track_id: int,
request: Request,
quality: Literal["FLAC", "Lossy"] = "FLAC",
user=Depends(get_current_user),
) -> Response: ) -> Response:
"""Get track by ID""" """Get track by ID"""
track = await self.trip_util.get_stream_url_by_track_id(track_id) track = await self.trip_util.get_stream_url_by_track_id(track_id, quality)
if not track: if not track:
return Response(status_code=404, content="Not found") return Response(status_code=404, content="Not found")
return JSONResponse(content={"stream_url": track}) return JSONResponse(content={"stream_url": track})
async def bulk_fetch_handler(
self,
data: ValidBulkFetchRequest,
request: Request,
user=Depends(get_current_user),
) -> Response:
"""Bulk fetch a list of track IDs"""
if not data or not data.track_ids or not data.target:
return JSONResponse(
content={
"err": True,
"errorText": "Invalid data",
}
)
track_ids = data.track_ids
target = data.target
job = self.task_queue.enqueue(
bulk_download,
args=(
track_ids,
data.quality,
),
job_timeout=14400,
failure_ttl=86400,
result_ttl=-1,
retry=Retry(max=1, interval=[30]),
meta={
"progress": 0,
"status": "Queued",
"target": target,
"tracks_in": len(track_ids),
"quality": data.quality,
},
)
self.redis_conn.lpush("enqueued_job_ids", job.id)
return JSONResponse(
content={
"job_id": job.id,
"status": "Queued",
"target": job.meta.get("target", None),
"quality": job.meta.get("quality", "Unknown"),
}
)
async def job_status_handler(
self, job_id: str, request: Request, user=Depends(get_current_user)
):
"""Get status and result of a single job"""
job = None
try:
# Try direct fetch first
job = Job.fetch(job_id, connection=self.redis_conn)
except Exception:
# If not found, try registries explicitly (in case fetch failed because the job left the queue)
registries = [
StartedJobRegistry(queue=self.task_queue),
FinishedJobRegistry(queue=self.task_queue),
FailedJobRegistry(queue=self.task_queue),
DeferredJobRegistry(queue=self.task_queue),
ScheduledJobRegistry(queue=self.task_queue),
]
for registry in registries:
if job_id in registry.get_job_ids():
try:
job = Job.fetch(job_id, connection=self.redis_conn)
except Exception:
pass
break
if job is None:
return JSONResponse({"error": "Job not found"}, status_code=404)
return self._format_job(job)
async def job_list_handler(self, request: Request, user=Depends(get_current_user)):
"""List all jobs across all registries (queued, started, finished, failed, etc)."""
jobs_info = []
seen = set()
# 1. Jobs still waiting in queue
for job in self.task_queue.jobs:
jobs_info.append(self._format_job(job))
seen.add(job.id)
# 2. Jobs in Started/Finished/Failed/Deferred registries
registries = [
StartedJobRegistry(queue=self.task_queue),
FinishedJobRegistry(queue=self.task_queue),
FailedJobRegistry(queue=self.task_queue),
DeferredJobRegistry(queue=self.task_queue),
]
for registry in registries:
for jid in registry.get_job_ids():
if jid in seen:
continue
try:
job = Job.fetch(jid, connection=self.redis_conn)
jobs_info.append(self._format_job(job))
seen.add(job.id)
except Exception:
continue # job might have been cleaned up
# 3. Jobs tracked in your custom enqueued_job_ids list
job_ids = self.redis_conn.lrange("enqueued_job_ids", 0, -1)
for jid_bytes in job_ids: # type: ignore
jid = jid_bytes.decode()
if jid in seen:
continue
try:
job = Job.fetch(jid, connection=self.redis_conn)
jobs_info.append(self._format_job(job))
seen.add(job.id)
except Exception:
continue
# ---- Sort newest first ----
def job_sort_key(job):
return (
job.get("ended_at")
or job.get("started_at")
or job.get("enqueued_at")
or 0
)
jobs_info.sort(key=job_sort_key, reverse=True)
return {"jobs": jobs_info}

View File

@@ -1,9 +1,9 @@
import os import os
import aiosqlite as sqlite3 import aiosqlite as sqlite3
from fastapi import FastAPI, Depends from fastapi import FastAPI, Depends, Response
from fastapi_throttle import RateLimiter from fastapi_throttle import RateLimiter
from fastapi.responses import JSONResponse from fastapi.responses import JSONResponse
from typing import Optional, LiteralString, Union from typing import Optional, LiteralString, Union, Iterable, cast
from .constructors import ValidShowEpisodeLineRequest, ValidShowEpisodeListRequest from .constructors import ValidShowEpisodeLineRequest, ValidShowEpisodeListRequest
@@ -88,7 +88,7 @@ class Transcriptions(FastAPI):
async with sqlite3.connect(database=db_path, timeout=1) as _db: async with sqlite3.connect(database=db_path, timeout=1) as _db:
async with await _db.execute(db_query) as _cursor: async with await _db.execute(db_query) as _cursor:
result: list[tuple] = await _cursor.fetchall() result: Iterable[sqlite3.Row] = await _cursor.fetchall()
return JSONResponse( return JSONResponse(
content={ content={
"show_title": show_title, "show_title": show_title,
@@ -104,7 +104,7 @@ class Transcriptions(FastAPI):
async def get_episode_lines_handler( async def get_episode_lines_handler(
self, data: ValidShowEpisodeLineRequest self, data: ValidShowEpisodeLineRequest
) -> JSONResponse: ) -> Response:
""" """
Get lines for a particular episode Get lines for a particular episode
- **s**: Show ID to query - **s**: Show ID to query
@@ -138,8 +138,14 @@ class Transcriptions(FastAPI):
async with sqlite3.connect(database=db_path, timeout=1) as _db: async with sqlite3.connect(database=db_path, timeout=1) as _db:
params: tuple = (episode_id,) params: tuple = (episode_id,)
async with await _db.execute(db_query, params) as _cursor: async with await _db.execute(db_query, params) as _cursor:
result: list[tuple] = await _cursor.fetchall() result: Iterable[sqlite3.Row] = await _cursor.fetchall()
first_result: tuple = result[0] result_list = cast(list[sqlite3.Row], result)
if not result_list:
return Response(
status_code=404,
content="Not found",
)
first_result: sqlite3.Row = result_list[0]
return JSONResponse( return JSONResponse(
content={ content={
"episode_id": episode_id, "episode_id": episode_id,

View File

@@ -66,6 +66,8 @@ class Cache:
confidence=row["confidence"], confidence=row["confidence"],
) )
else: else:
if not sqlite_rows:
return None
for row in sqlite_rows: for row in sqlite_rows:
if row[0] == matched_id: if row[0] == matched_id:
(_id, artist, song, lyrics, original_src) = row[:-1] (_id, artist, song, lyrics, original_src) = row[:-1]

View File

@@ -45,8 +45,8 @@ class Genius:
Optional[LyricsResult]: The result, if found - None otherwise. Optional[LyricsResult]: The result, if found - None otherwise.
""" """
try: try:
artist: str = artist.strip().lower() artist = artist.strip().lower()
song: str = song.strip().lower() song = song.strip().lower()
time_start: float = time.time() time_start: float = time.time()
logging.info("Searching %s - %s on %s", artist, song, self.label) logging.info("Searching %s - %s on %s", artist, song, self.label)
search_term: str = f"{artist}%20{song}" search_term: str = f"{artist}%20{song}"
@@ -56,7 +56,6 @@ class Genius:
f"{self.genius_search_url}{search_term}", f"{self.genius_search_url}{search_term}",
timeout=self.timeout, timeout=self.timeout,
headers=self.headers, headers=self.headers,
verify_ssl=False,
proxy=private.GENIUS_PROXY, proxy=private.GENIUS_PROXY,
) as request: ) as request:
request.raise_for_status() request.raise_for_status()
@@ -113,7 +112,6 @@ class Genius:
scrape_url, scrape_url,
timeout=self.timeout, timeout=self.timeout,
headers=self.headers, headers=self.headers,
verify_ssl=False,
proxy=private.GENIUS_PROXY, proxy=private.GENIUS_PROXY,
) as scrape_request: ) as scrape_request:
scrape_request.raise_for_status() scrape_request.raise_for_status()

View File

@@ -1,195 +0,0 @@
from aiohttp import ClientSession, ClientTimeout
from typing import Optional
from urllib.parse import urlencode, quote
import logging
import os
import asyncio
from streamrip.client import TidalClient
from streamrip.config import Config as StreamripConfig
from dotenv import load_dotenv
load_dotenv()
class SRUtil:
"""
StreamRip API Utility Class
"""
def __init__(self) -> None:
"""Initialize StreamRip utility."""
self.streamrip_config = StreamripConfig.defaults()
self.streamrip_config.session.tidal.user_id = os.getenv("tidal_user_id", "")
self.streamrip_config.session.tidal.access_token = os.getenv(
"tidal_access_token", ""
)
self.streamrip_config.session.tidal.refresh_token = os.getenv(
"tidal_refresh_token", ""
)
self.streamrip_config.session.tidal.token_expiry = os.getenv(
"tidal_token_expiry", ""
)
self.streamrip_config.session.tidal.country_code = os.getenv(
"tidal_country_code", ""
)
self.streamrip_config.session.tidal.quality = int(
os.getenv("tidal_default_quality", 2)
)
self.streamrip_config.session.conversion.enabled = False
self.streamrip_config.session.downloads.folder = os.getenv(
"tidal_download_folder", ""
)
self.streamrip_config
self.streamrip_client = TidalClient(self.streamrip_config)
def dedupe_by_key(self, key: str, entries: list[dict]) -> list[dict]:
deduped = {}
for entry in entries:
norm = entry[key].strip().lower()
if norm not in deduped:
deduped[norm] = entry
return list(deduped.values())
def format_duration(self, seconds):
if not seconds:
return None
m, s = divmod(seconds, 60)
return f"{m}:{s:02}"
async def get_artists_by_name(self, artist_name: str) -> Optional[list]:
"""Get artist(s) by name from HiFi API.
Args:
artist_name (str): The name of the artist.
Returns:
Optional[dict]: The artist details or None if not found.
"""
if not self.streamrip_client.logged_in:
await self.streamrip_client.login()
artists_out: list[dict] = []
try:
artists = await self.streamrip_client.search(
media_type="artist", query=artist_name
)
except AttributeError:
await self.streamrip_client.login()
artists = await self.streamrip_client.search(
media_type="artist", query=artist_name
)
artists = artists[0].get("items", [])
if not artists:
logging.warning("No artist found for name: %s", artist_name)
return None
artists_out = [
{
"artist": res["name"],
"id": res["id"],
}
for res in artists
if "name" in res and "id" in res
]
artists_out = self.dedupe_by_key("artist", artists_out) # Remove duplicates
return artists_out
async def get_albums_by_artist_id(self, artist_id: int) -> Optional[list | dict]:
"""Get albums by artist ID from HiFi API.
Args:
artist_id (int): The ID of the artist.
Returns:
Optional[list[dict]]: List of albums or None if not found.
"""
artist_id_str: str = str(artist_id)
albums_out: list[dict] = []
try:
if not self.streamrip_client.logged_in:
await self.streamrip_client.login()
metadata = await self.streamrip_client.get_metadata(
item_id=artist_id_str, media_type="artist"
)
except AttributeError:
await self.streamrip_client.login()
metadata = await self.streamrip_client.get_metadata(
item_id=artist_id_str, media_type="artist"
)
if not metadata:
logging.warning("No metadata found for artist ID: %s", artist_id)
return None
albums = metadata.get("albums", [])
albums_out = [
{
"artist": ", ".join(artist["name"] for artist in album["artists"]),
"album": album["title"],
"id": album["id"],
"release_date": album.get("releaseDate", "Unknown"),
}
for album in albums
if "title" in album and "id" in album and "artists" in album
]
logging.info("Retrieved albums: %s", albums_out)
return albums_out
async def get_tracks_by_album_id(self, album_id: int) -> Optional[list | dict]:
"""Get tracks by album ID from HiFi API.
Args:
album_id (int): The ID of the album.
Returns:
Optional[list[dict]]: List of tracks or None if not found.
"""
album_id_str = str(album_id)
if not self.streamrip_client.logged_in:
await self.streamrip_client.login()
metadata = await self.streamrip_client.get_metadata(
item_id=album_id_str, media_type="album"
)
if not metadata:
logging.warning("No metadata found for album ID: %s", album_id)
return None
track_list = metadata.get("tracks", [])
tracks_out: list[dict] = [
{
"id": track.get("id"),
"artist": track.get("artist").get("name"),
"title": track.get("title"),
"duration": self.format_duration(track.get("duration", 0)),
"version": track.get("version"),
"audioQuality": track.get("audioQuality"),
}
for track in track_list
]
return tracks_out
async def get_tracks_by_artist_song(self, artist: str, song: str) -> Optional[list]:
"""Get track by artist and song name from HiFi API.
Args:
artist (str): The name of the artist.
song (str): The name of the song.
Returns:
Optional[dict]: The track details or None if not found.
TODO: Reimplement using StreamRip
"""
return []
async def get_stream_url_by_track_id(
self, track_id: int, quality: str = "LOSSLESS"
) -> Optional[str]:
"""Get stream URL by track ID from HiFi API.
Args:
track_id (int): The ID of the track.
quality (str): The quality of the stream, default is "LOSSLESS". Other options: HIGH, LOW
Returns:
Optional[str]: The stream URL or None if not found.
"""
track_id_str = str(track_id)
track = await self.streamrip_client.get_downloadable(
track_id=track_id_str, quality=self.streamrip_config.session.tidal.quality
)
if not track:
logging.warning("No track found for ID: %s", track_id)
return None
stream_url = track.url
if not stream_url:
logging.warning("No stream URL found for track ID: %s", track_id)
return None
return stream_url

View File

@@ -26,6 +26,7 @@ class MemeUtil:
bool bool
""" """
# Accepts either bytes or a BytesIO-like object # Accepts either bytes or a BytesIO-like object
signature = None
if isinstance(buffer, io.BytesIO): if isinstance(buffer, io.BytesIO):
if hasattr(buffer, "read") and hasattr(buffer, "seek"): if hasattr(buffer, "read") and hasattr(buffer, "seek"):
pos = buffer.tell() pos = buffer.tell()
@@ -153,6 +154,8 @@ class MemeUtil:
query: str = "SELECT count(id) AS count FROM memes" query: str = "SELECT count(id) AS count FROM memes"
async with await db_conn.execute(query) as db_cursor: async with await db_conn.execute(query) as db_cursor:
result = await db_cursor.fetchone() result = await db_cursor.fetchone()
if not result:
return None
count = result["count"] count = result["count"]
if not isinstance(count, int): if not isinstance(count, int):
return None return None

View File

@@ -51,7 +51,7 @@ class RadioUtil:
'main': self.constants.RADIO_DB_QUERY, 'main': self.constants.RADIO_DB_QUERY,
'rap': self.constants.RADIO_DB_QUERY_RAP, 'rap': self.constants.RADIO_DB_QUERY_RAP,
'pop': self.constants.RADIO_DB_QUERY_POP, 'pop': self.constants.RADIO_DB_QUERY_POP,
'classical': self.constants.RADIO_DB_QUERY_CLASSICAL, # 'classical': self.constants.RADIO_DB_QUERY_CLASSICAL,
'rock': self.constants.RADIO_DB_QUERY_ROCK, 'rock': self.constants.RADIO_DB_QUERY_ROCK,
'electronic': self.constants.RADIO_DB_QUERY_ELECTRONIC, 'electronic': self.constants.RADIO_DB_QUERY_ELECTRONIC,
} }
@@ -77,7 +77,6 @@ class RadioUtil:
"rock", "rock",
"rap", "rap",
"electronic", "electronic",
"classical",
"pop", "pop",
] ]
self.active_playlist: dict[str, list[dict]] = {} self.active_playlist: dict[str, list[dict]] = {}
@@ -152,10 +151,10 @@ class RadioUtil:
filter = filter.strip().lower() filter = filter.strip().lower()
matched: list[dict] = [] matched: list[dict] = []
for item in self.active_playlist[station]: for item in self.active_playlist[station]:
artist: str = item.get("artist", None) artist: str = item.get("artist", "")
song: str = item.get("song", None) song: str = item.get("song", "")
artistsong: str = item.get("artistsong", None) artistsong: str = item.get("artistsong", "")
album: str = item.get("album", None) album: str = item.get("album", "")
if not artist or not song or not artistsong: if not artist or not song or not artistsong:
continue continue
if non_alnum.sub("", filter) in non_alnum.sub("", artistsong).lower(): if non_alnum.sub("", filter) in non_alnum.sub("", artistsong).lower():
@@ -201,6 +200,8 @@ class RadioUtil:
search_song = song search_song = song
if not artistsong: if not artistsong:
artistsong = f"{search_artist} - {search_song}" artistsong = f"{search_artist} - {search_song}"
if not search_artist or not search_song or not artistsong:
raise RadioException("No query provided")
search_params = ( search_params = (
search_artist.lower(), search_artist.lower(),
search_song.lower(), search_song.lower(),
@@ -280,6 +281,8 @@ class RadioUtil:
""" """
try: try:
added_rows: int = 0 added_rows: int = 0
artist = None
genre = None
with sqlite3.connect(self.artist_genre_db_path, timeout=2) as _db: with sqlite3.connect(self.artist_genre_db_path, timeout=2) as _db:
for pair in pairs: for pair in pairs:
try: try:
@@ -388,11 +391,10 @@ class RadioUtil:
for playlist in self.playlists: for playlist in self.playlists:
playlist_redis_key: str = f"playlist:{playlist}" playlist_redis_key: str = f"playlist:{playlist}"
_playlist = await self.redis_client.json().get(playlist_redis_key) _playlist = await self.redis_client.json().get(playlist_redis_key) # type: ignore
if playlist not in self.active_playlist.keys(): if playlist not in self.active_playlist.keys():
self.active_playlist[playlist] = [] self.active_playlist[playlist] = []
if not playlist == "rock": random.shuffle(_playlist)
random.shuffle(_playlist) # Temp/for Cocteau Twins
self.active_playlist[playlist] = [ self.active_playlist[playlist] = [
{ {
"uuid": str(uuid().hex), "uuid": str(uuid().hex),
@@ -418,7 +420,7 @@ class RadioUtil:
logging.info("Removing duplicate tracks...") logging.info("Removing duplicate tracks...")
dedupe_processed = [] dedupe_processed = []
for item in self.active_playlist[playlist]: for item in self.active_playlist[playlist]:
artistsongabc: str = non_alnum.sub("", item.get("artistsong", None)) artistsongabc: str = non_alnum.sub("", item.get("artistsong", ""))
if not artistsongabc: if not artistsongabc:
logging.info("Missing artistsong: %s", item) logging.info("Missing artistsong: %s", item)
continue continue

401
utils/rip_background.py Normal file
View File

@@ -0,0 +1,401 @@
import logging
import asyncio
import random
import os
import tarfile
import traceback
import uuid
import subprocess
import shutil
import re
from pathlib import Path
from urllib.parse import urlparse, unquote
import aiohttp
from datetime import datetime
from mediafile import MediaFile # type: ignore[import]
from rq import get_current_job
from utils.sr_wrapper import SRUtil
# ---------- Config ----------
ROOT_DIR = Path("/storage/music2")
MAX_RETRIES = 5
THROTTLE_MIN = 1.0
THROTTLE_MAX = 3.5
HEADERS = {
"User-Agent": (
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
"AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/116.0.5845.97 Safari/537.36"
),
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8",
"Accept-Language": "en-US,en;q=0.9",
"Connection": "keep-alive",
}
logging.basicConfig(
level=logging.DEBUG,
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
)
sr = SRUtil()
# ---------- Helpers ----------
def tag_with_mediafile(file_path: str, meta: dict):
f = MediaFile(file_path)
# --- Helper to safely set textual/number fields ---
def safe_set(attr, value, default=None, cast=None):
if value is None:
value = default
if value is not None:
if cast is not None:
setattr(f, attr, cast(value))
else:
setattr(f, attr, str(value))
# --- Basic textual metadata ---
safe_set("title", meta.get("title"), default="Unknown Title")
safe_set("artist", meta.get("artist"), default="Unknown Artist")
safe_set("albumartist", meta.get("album_artist"), default="Unknown Artist")
safe_set("album", meta.get("album"), default="Unknown Album")
safe_set("track", meta.get("track_number"), default=0, cast=int)
safe_set("disc", meta.get("disc_number"), default=0, cast=int)
safe_set("isrc", meta.get("isrc"), default="")
safe_set("bpm", meta.get("bpm"), default=0, cast=int)
# --- Release date ---
release_date_str = meta.get("release_date")
release_date_obj = None
if release_date_str:
try:
release_date_obj = datetime.fromisoformat(release_date_str).date()
except ValueError:
try:
# fallback if only year string
release_date_obj = datetime(int(release_date_str[:4]), 1, 1).date()
except Exception:
pass
if release_date_obj:
f.date = release_date_obj
# --- Save all tags ---
f.save()
def cleanup_empty_dirs(root: Path):
"""
Recursively remove any directories under root that contain no files
(empty or only empty subdirectories).
"""
for dirpath, dirnames, filenames in os.walk(root, topdown=False):
p = Path(dirpath)
has_file = any(f.is_file() for f in p.rglob("*"))
if not has_file:
try:
p.rmdir()
except Exception:
pass
def sanitize_filename(name: str) -> str:
"""Make a string safe for file/dir names."""
if not name:
return "Unknown"
name = name.replace("/", "-").replace("\\", "-")
name = re.sub(r'[<>:"|?*\x00-\x1F]', "", name)
name = name.strip().strip(".")
name = re.sub(r"\s+", " ", name)
return name[:180] or "Unknown"
def ensure_unique_path(p: Path) -> Path:
"""
Ensure the given file or directory path is unique *within its parent folder*.
Only appends (2), (3)... if a real conflict exists in that folder.
"""
parent = p.parent
stem, suffix = p.stem, p.suffix
existing = {f.name for f in parent.glob(f"*{suffix}") if f.is_file()}
candidate = f"{stem}{suffix}"
if candidate not in existing:
return parent / candidate
counter = 2
while True:
candidate = f"{stem} ({counter}){suffix}"
if candidate not in existing:
return parent / candidate
counter += 1
# ---------- Job ----------
def bulk_download(track_list: list, quality: str = "FLAC"):
"""
RQ job:
- fetches stream URLs
- downloads with retries + throttling
- uses SR metadata to name/organize files
- creates ONE tarball for all tracks
- returns [tarball_path]
"""
job = get_current_job()
job_id = job.id if job else uuid.uuid4().hex
staging_root = ROOT_DIR / job_id
if job:
try:
job.meta["track_ids"] = [str(t) for t in (track_list or [])]
job.meta["tracks"] = []
job.meta["progress"] = 0
job.meta["tarball"] = None
job.meta["status"] = "Started"
job.save_meta()
except Exception as e:
logging.warning("Failed to init job.meta: %s", e)
async def process_tracks():
per_track_meta = []
all_final_files = []
all_artists = set()
(ROOT_DIR / "completed").mkdir(parents=True, exist_ok=True)
async with aiohttp.ClientSession(headers=HEADERS) as session:
total = len(track_list or [])
logging.critical("Total tracks to process: %s", total)
if job:
job.meta["progress"] = 0
job.save_meta()
for i, track_id in enumerate(track_list or []):
track_info = {
"track_id": str(track_id),
"status": "Pending",
"file_path": None,
"error": None,
"attempts": 0,
}
attempt = 0
while attempt < MAX_RETRIES:
tmp_file = None
attempt += 1
track_info["attempts"] = attempt
try:
url = await sr.get_stream_url_by_track_id(track_id, quality)
if not url:
raise RuntimeError("No stream URL")
parsed = urlparse(url)
clean_path = unquote(parsed.path)
ext = Path(clean_path).suffix or ".mp3"
tmp_file = Path(f"/tmp/{uuid.uuid4().hex}{ext}")
async with session.get(url) as resp:
resp.raise_for_status()
with open(tmp_file, "wb") as f:
async for chunk in resp.content.iter_chunked(64 * 1024):
f.write(chunk)
md = await sr.get_metadata_by_track_id(track_id) or {}
logging.info("Metadata for %s: %s", track_id, md)
artist_raw = md.get("artist") or "Unknown Artist"
album_raw = md.get("album") or "Unknown Album"
title_raw = md.get("title") or f"Track {track_id}"
artist = sanitize_filename(artist_raw)
album = sanitize_filename(album_raw)
title = sanitize_filename(title_raw)
all_artists.add(artist)
artist_dir = staging_root / artist
album_dir = artist_dir / album
album_dir.mkdir(parents=True, exist_ok=True)
final_file = ensure_unique_path(album_dir / f"{title}{ext}")
tag_with_mediafile(str(tmp_file), md)
tmp_file.rename(final_file)
tmp_file = None
track_info["status"] = "Success"
track_info["file_path"] = str(final_file)
track_info["error"] = None
all_final_files.append(final_file)
if job:
job.meta["progress"] = int(((i + 1) / total) * 100)
job.save_meta()
break
except aiohttp.ClientResponseError as e:
if e.status == 429:
wait_time = min(60, 2**attempt) # exponential up to 60s
logging.warning(
"Rate limited (429). Sleeping %s seconds", wait_time
)
await asyncio.sleep(wait_time)
else:
await asyncio.sleep(
random.uniform(THROTTLE_MIN, THROTTLE_MAX)
)
except Exception as e:
logging.error(
"Track %s attempt %s failed: %s", track_id, attempt, e
)
traceback.print_exc()
track_info["error"] = str(e)
if attempt >= MAX_RETRIES:
track_info["status"] = "Failed"
await asyncio.sleep(random.uniform(THROTTLE_MIN, THROTTLE_MAX))
finally:
try:
if tmp_file and tmp_file.exists():
tmp_file.unlink()
except Exception:
pass
per_track_meta.append(track_info)
if job:
try:
job.meta["tracks"] = per_track_meta
job.save_meta()
except Exception as e:
logging.warning(
"Failed to update job.meta after track %s: %s", track_id, e
)
await asyncio.sleep(random.uniform(THROTTLE_MIN, THROTTLE_MAX))
if not all_final_files:
if job:
try:
job.meta["tarball"] = None
job.meta["status"] = "Failed"
job.save_meta()
except Exception:
pass
return []
artist_counts: dict[str, int] = {}
for t in per_track_meta:
if t["status"] == "Success" and t.get("file_path"):
try:
artist = Path(t["file_path"]).relative_to(staging_root).parts[0]
except Exception:
artist = "Unknown Artist"
artist_counts[artist] = artist_counts.get(artist, 0) + 1
if artist_counts:
top_artist = sorted(artist_counts.items(), key=lambda kv: (-kv[1], kv[0]))[
0
][0]
else:
top_artist = "Unknown Artist"
combined_artist = sanitize_filename(top_artist)
staged_tarball = staging_root / f"{combined_artist}.tar.gz"
# Ensure uniqueness (Windows-style padding) within the parent folder
counter = 1
base_name = staged_tarball.stem
while staged_tarball.exists():
counter += 1
staged_tarball = staging_root / f"{base_name} ({counter}).tar.gz"
final_tarball = ROOT_DIR / "completed" / quality / staged_tarball.name
final_tarball.parent.mkdir(parents=True, exist_ok=True)
if job:
try:
job.meta["status"] = "Compressing"
job.save_meta()
except Exception:
pass
logging.info("Creating tarball: %s", staged_tarball)
def _create_tar_sync():
try:
subprocess.run(
[
"tar",
"-I",
"pigz -9",
"-cf",
str(staged_tarball),
"-C",
str(staging_root),
]
+ [str(f.relative_to(staging_root)) for f in all_final_files],
check=True,
)
for f in all_final_files:
try:
os.remove(f)
except Exception:
pass
except FileNotFoundError:
logging.warning("pigz not available, falling back to tarfile (slower).")
with tarfile.open(staged_tarball, "w:gz") as tar:
for f in all_final_files:
try:
arcname = f.relative_to(staging_root)
except ValueError:
arcname = f.name
tar.add(f, arcname=str(arcname))
try:
os.remove(f)
except Exception:
pass
await asyncio.to_thread(_create_tar_sync)
if not staged_tarball.exists():
logging.error("Tarball was not created: %s", staged_tarball)
if job:
try:
job.meta["status"] = "compress_failed"
job.save_meta()
except Exception:
pass
return []
logging.critical("Tarball created: %s", staged_tarball)
try:
staged_tarball.rename(final_tarball)
except Exception:
shutil.move(str(staged_tarball), str(final_tarball))
logging.critical("Tarball finalized: %s", final_tarball)
await asyncio.to_thread(shutil.rmtree, staging_root, ignore_errors=True)
if job:
job.meta["tarball"] = str(final_tarball)
job.meta["progress"] = 100
job.meta["status"] = "Completed"
job.save_meta()
return [str(final_tarball)]
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
return loop.run_until_complete(process_tracks())
except Exception as e:
if job:
job.meta["status"] = "Failed"
job.save_meta()
logging.critical("Exception: %s", str(e))
finally:
loop.close()

401
utils/sr_wrapper.py Normal file
View File

@@ -0,0 +1,401 @@
from typing import Optional, Any
from uuid import uuid4
from urllib.parse import urlparse
import hashlib
import logging
import random
import asyncio
import os
import aiohttp
import time
from streamrip.client import TidalClient # type: ignore
from streamrip.config import Config as StreamripConfig # type: ignore
from dotenv import load_dotenv
load_dotenv()
class SRUtil:
"""
StreamRip API Utility Class
"""
def __init__(self) -> None:
"""Initialize StreamRip utility."""
self.streamrip_config = StreamripConfig.defaults()
self.streamrip_config.session.tidal.user_id = os.getenv("tidal_user_id", "")
self.streamrip_config.session.tidal.access_token = os.getenv(
"tidal_access_token", ""
)
self.streamrip_config.session.tidal.refresh_token = os.getenv(
"tidal_refresh_token", ""
)
self.streamrip_config.session.tidal.token_expiry = os.getenv(
"tidal_token_expiry", ""
)
self.streamrip_config.session.tidal.country_code = os.getenv(
"tidal_country_code", ""
)
self.streamrip_config.session.tidal.quality = int(
os.getenv("tidal_default_quality", 2)
)
self.streamrip_config.session.conversion.enabled = False
self.streamrip_config.session.downloads.folder = os.getenv(
"tidal_download_folder", ""
)
self.streamrip_config
self.streamrip_client = TidalClient(self.streamrip_config)
self.MAX_CONCURRENT_METADATA_REQUESTS = 2
self.METADATA_RATE_LIMIT = 1.25
self.METADATA_SEMAPHORE = asyncio.Semaphore(self.MAX_CONCURRENT_METADATA_REQUESTS)
self.LAST_METADATA_REQUEST = 0
self.MAX_METADATA_RETRIES = 5
self.METADATA_ALBUM_CACHE: dict[str, dict] = {}
self.RETRY_DELAY = 1.0 # seconds between retries
async def rate_limited_request(self, func, *args, **kwargs):
async with self.METADATA_SEMAPHORE:
now = time.time()
elapsed = now - self.LAST_METADATA_REQUEST
if elapsed < self.METADATA_RATE_LIMIT:
await asyncio.sleep(self.METADATA_RATE_LIMIT - elapsed)
result = await func(*args, **kwargs)
self.last_request_time = time.time()
return result
def dedupe_by_key(self, key: str, entries: list[dict]) -> list[dict]:
deduped = {}
for entry in entries:
norm = entry[key].strip().lower()
if norm not in deduped:
deduped[norm] = entry
return list(deduped.values())
def format_duration(self, seconds):
if not seconds:
return None
m, s = divmod(seconds, 60)
return f"{m}:{s:02}"
def combine_album_track_metadata(
self, album_json: dict | None, track_json: dict
) -> dict:
"""
Combine album-level and track-level metadata into a unified tag dictionary.
Track-level metadata overrides album-level where relevant.
"""
album_json = album_json or {}
# Album-level
combined = {
"album": album_json.get("title"),
"album_artist": album_json.get("artist", {}).get("name"),
"release_date": album_json.get("releaseDate"),
"album_type": album_json.get("type"),
"total_tracks": album_json.get("numberOfTracks"),
"upc": album_json.get("upc"),
"album_copyright": album_json.get("copyright"),
"album_cover_id": album_json.get("cover"),
"album_cover_url": f"https://resources.tidal.com/images/{album_json.get('cover')}/1280x1280.jpg"
if album_json.get("cover")
else None,
}
# Track-level (overrides or adds to album info)
combined.update(
{
"title": track_json.get("title"),
"artist": track_json.get("artist", {}).get("name"),
"artists": [a.get("name") for a in track_json.get("artists", [])],
"track_number": track_json.get("trackNumber"),
"disc_number": track_json.get("volumeNumber"),
"duration": track_json.get("duration"),
"isrc": track_json.get("isrc"),
"bpm": track_json.get("bpm"),
"explicit": track_json.get("explicit"),
"replaygain": track_json.get("replayGain"),
"peak": track_json.get("peak"),
"lyrics": track_json.get("lyrics"),
"track_copyright": track_json.get("copyright"),
"cover_id": track_json.get("album", {}).get("cover") or album_json.get("cover"),
"cover_url": (
f"https://resources.tidal.com/images/{track_json.get('album', {}).get('cover', album_json.get('cover'))}/1280x1280.jpg"
if (track_json.get("album", {}).get("cover") or album_json.get("cover"))
else None
),
}
)
return combined
def combine_album_with_all_tracks(
self, album_json: dict[str, Any]
) -> list[dict[str, Any]]:
"""Return a list of combined metadata dicts for all tracks in an album JSON."""
return [
self.combine_album_track_metadata(album_json, t)
for t in album_json.get("tracks", [])
]
async def get_artists_by_name(self, artist_name: str) -> Optional[list]:
"""Get artist(s) by name.
Args:
artist_name (str): The name of the artist.
Returns:
Optional[dict]: The artist details or None if not found.
"""
try:
await self.streamrip_client.login()
except Exception as e:
logging.info("Login Exception: %s", str(e))
pass
artists_out: list[dict] = []
try:
artists = await self.streamrip_client.search(
media_type="artist", query=artist_name
)
except AttributeError:
await self.streamrip_client.login()
artists = await self.streamrip_client.search(
media_type="artist", query=artist_name
)
logging.critical("Artists output: %s", artists)
artists = artists[0].get("items", [])
if not artists:
logging.warning("No artist found for name: %s", artist_name)
return None
artists_out = [
{
"artist": res["name"],
"id": res["id"],
}
for res in artists
if "name" in res and "id" in res
]
artists_out = self.dedupe_by_key("artist", artists_out) # Remove duplicates
return artists_out
async def get_albums_by_artist_id(self, artist_id: int) -> Optional[list | dict]:
"""Get albums by artist ID
Args:
artist_id (int): The ID of the artist.
Returns:
Optional[list[dict]]: List of albums or None if not found.
"""
artist_id_str: str = str(artist_id)
albums_out: list[dict] = []
try:
await self.streamrip_client.login()
metadata = await self.streamrip_client.get_metadata(
item_id=artist_id_str, media_type="artist"
)
except AttributeError:
await self.streamrip_client.login()
metadata = await self.streamrip_client.get_metadata(
item_id=artist_id_str, media_type="artist"
)
if not metadata:
logging.warning("No metadata found for artist ID: %s", artist_id)
return None
albums = self.dedupe_by_key("title", metadata.get("albums", []))
albums_out = [
{
"artist": ", ".join(artist["name"] for artist in album["artists"]),
"album": album["title"],
"id": album["id"],
"release_date": album.get("releaseDate", "Unknown"),
}
for album in albums
if "title" in album and "id" in album and "artists" in album
]
logging.debug("Retrieved albums: %s", albums_out)
return albums_out
async def get_tracks_by_album_id(
self, album_id: int, quality: str = "FLAC"
) -> Optional[list | dict]:
"""Get tracks by album ID
Args:
album_id (int): The ID of the album.
Returns:
Optional[list[dict]]: List of tracks or None if not found.
"""
album_id_str = str(album_id)
await self.streamrip_client.login()
metadata = await self.streamrip_client.get_metadata(
item_id=album_id_str, media_type="album"
)
if not metadata:
logging.warning("No metadata found for album ID: %s", album_id)
return None
track_list = metadata.get("tracks", [])
tracks_out: list[dict] = [
{
"id": track.get("id"),
"artist": track.get("artist").get("name"),
"title": track.get("title"),
"duration": self.format_duration(track.get("duration", 0)),
"version": track.get("version"),
"audioQuality": track.get("audioQuality"),
}
for track in track_list
]
return tracks_out
async def get_tracks_by_artist_song(self, artist: str, song: str) -> Optional[list]:
"""Get track by artist and song name
Args:
artist (str): The name of the artist.
song (str): The name of the song.
Returns:
Optional[dict]: The track details or None if not found.
TODO: Reimplement using StreamRip
"""
return []
async def get_stream_url_by_track_id(
self, track_id: int, quality: str = "FLAC"
) -> Optional[str]:
"""Get stream URL by track ID
Args:
track_id (int): The ID of the track.
quality (str): The quality of the stream, default is "LOSSLESS". Other options: HIGH, LOW
Returns:
Optional[str]: The stream URL or None if not found.
"""
if quality not in ["FLAC", "Lossy"]:
logging.error("Invalid quality requested: %s", quality)
return None
quality_int: int = int(self.streamrip_config.session.tidal.quality)
match quality:
case "FLAC":
quality_int = 2
case "Lossy":
quality_int = 1
track_id_str: str = str(track_id)
await self.streamrip_client.login()
try:
logging.critical("Using quality_int: %s", quality_int)
track = await self.streamrip_client.get_downloadable(
track_id=track_id_str, quality=quality_int
)
except AttributeError:
await self.streamrip_client.login()
track = await self.streamrip_client.get_downloadable(
track_id=track_id_str, quality=quality_int
)
if not track:
logging.warning("No track found for ID: %s", track_id)
return None
stream_url = track.url
if not stream_url:
logging.warning("No stream URL found for track ID: %s", track_id)
return None
return stream_url
async def get_metadata_by_track_id(self, track_id: int) -> Optional[dict]:
"""
Fetch track + album metadata with retries, caching album data.
Returns combined metadata dict or None after exhausting retries.
"""
for attempt in range(1, self.MAX_METADATA_RETRIES + 1):
try:
await self.streamrip_client.login()
# Track metadata
metadata = await self.rate_limited_request(
self.streamrip_client.get_metadata, str(track_id), "track"
)
album_id = metadata.get("album", {}).get("id")
album_metadata = None
if album_id:
# Check cache first
if album_id in self.METADATA_ALBUM_CACHE:
album_metadata = self.METADATA_ALBUM_CACHE[album_id]
else:
album_metadata = await self.rate_limited_request(
self.streamrip_client.get_metadata, album_id, "album"
)
if not album_metadata:
return None
self.METADATA_ALBUM_CACHE[album_id] = album_metadata
# Combine track + album metadata
if not album_metadata:
return None
combined_metadata: dict = self.combine_album_track_metadata(
album_metadata, metadata
)
logging.info(
"Combined metadata for track ID %s (attempt %d): %s",
track_id,
attempt,
combined_metadata,
)
return combined_metadata
except Exception as e:
# Exponential backoff with jitter for 429 or other errors
delay = self.RETRY_DELAY * (2 ** (attempt - 1)) + random.uniform(0, 0.5)
logging.warning(
"Metadata fetch failed for track %s (attempt %d/%d): %s. Retrying in %.2fs",
track_id,
attempt,
self.MAX_METADATA_RETRIES,
str(e),
delay,
)
if attempt < self.MAX_METADATA_RETRIES:
await asyncio.sleep(delay)
else:
logging.error(
"Metadata fetch failed permanently for track %s after %d attempts",
track_id,
self.MAX_METADATA_RETRIES,
)
return None
async def download(self, track_id: int, quality: str = "LOSSLESS") -> bool | str:
"""Download track
Args:
track_id (int)
quality (str): The quality of the stream, default is "LOSSLESS". Other options: HIGH, LOW
Returns:
bool
"""
try:
await self.streamrip_client.login()
track_url = await self.get_stream_url_by_track_id(track_id)
if not track_url:
return False
parsed_url = urlparse(track_url)
parsed_url_filename = os.path.basename(parsed_url.path)
parsed_url_ext = os.path.splitext(parsed_url_filename)[1]
unique = hashlib.sha256(uuid4().bytes).hexdigest()[:16]
dl_folder_path = (
f"{self.streamrip_config.session.downloads.folder}/{unique}"
)
dl_path = f"{dl_folder_path}/{track_id}.{parsed_url_ext}"
async with aiohttp.ClientSession() as session:
async with session.get(
track_url, headers={}, timeout=aiohttp.ClientTimeout(total=60)
) as resp:
resp.raise_for_status()
with open(dl_path, "wb") as f:
async for chunk in resp.content.iter_chunked(1024 * 64):
f.write(chunk)
return dl_path
except Exception as e:
logging.critical("Error: %s", str(e))
return False

35
utils/test.conf Normal file
View File

@@ -0,0 +1,35 @@
# -----------------------
# /m/m2/ PHP handler
location ~ ^/m/m2/(.+\.php)$ {
alias /storage/music2/completed/;
include fastcgi_params;
fastcgi_pass unix:/run/php/php8.2-fpm.sock;
fastcgi_param SCRIPT_FILENAME /storage/music2/completed/$1;
fastcgi_param DOCUMENT_ROOT /storage/music2/completed;
fastcgi_param SCRIPT_NAME /m/m2/$1;
}
# /m/m2/ static files
location /m/m2/ {
alias /storage/music2/completed/;
index index.php;
try_files $uri $uri/ /index.php$is_args$args;
}
# -----------------------
# /m/ PHP handler
location ~ ^/m/(.+\.php)$ {
root /var/www/codey.lol/new/public;
include fastcgi_params;
fastcgi_pass unix:/run/php/php8.2-fpm.sock;
fastcgi_param SCRIPT_FILENAME $document_root/$1;
fastcgi_param DOCUMENT_ROOT $document_root;
fastcgi_param SCRIPT_NAME /m/$1;
}
# /m/ static files
location /m/ {
root /var/www/codey.lol/new/public;
index index.php;
try_files $uri $uri/ /m/index.php$is_args$args;
}