docstrings / formatting

This commit is contained in:
2025-09-23 13:17:34 -04:00
parent c2044711fb
commit 19afb287cd
16 changed files with 1165 additions and 428 deletions

View File

@@ -15,7 +15,10 @@ class LastFMException(Exception):
class ValidArtistSearchRequest(BaseModel):
"""
- **a**: artist name
Request model for searching an artist by name.
Attributes:
- **a** (str): Artist name.
"""
a: str
@@ -33,8 +36,11 @@ class ValidArtistSearchRequest(BaseModel):
class ValidAlbumDetailRequest(BaseModel):
"""
- **a**: artist name
- **release**: album/release name (as sourced from here/LastFM)
Request model for album details.
Attributes:
- **a** (str): Artist name.
- **release** (str): Album/release name.
"""
a: str
@@ -54,8 +60,11 @@ class ValidAlbumDetailRequest(BaseModel):
class ValidTrackInfoRequest(BaseModel):
"""
- **a**: artist name
- **t**: track
Request model for track info.
Attributes:
- **a** (str): Artist name.
- **t** (str): Track name.
"""
a: str
@@ -80,7 +89,10 @@ Rand Msg
class RandMsgRequest(BaseModel):
"""
- **short**: Short randmsg?
Request model for random message.
Attributes:
- **short** (Optional[bool]): Short randmsg?
"""
short: Optional[bool] = False
@@ -93,7 +105,10 @@ YT
class ValidYTSearchRequest(BaseModel):
"""
- **t**: title to search
Request model for YouTube search.
Attributes:
- **t** (str): Title to search.
"""
t: str = "rick astley - never gonna give you up"
@@ -106,7 +121,10 @@ Transcriptions
class ValidShowEpisodeListRequest(BaseModel):
"""
- **s**: show id
Request model for show episode list.
Attributes:
- **s** (int): Show ID.
"""
s: int
@@ -114,8 +132,11 @@ class ValidShowEpisodeListRequest(BaseModel):
class ValidShowEpisodeLineRequest(BaseModel):
"""
- **s**: show id
- **e**: episode id
Request model for show episode lines.
Attributes:
- **s** (int): Show ID.
- **e** (int): Episode ID.
"""
s: int
@@ -129,14 +150,17 @@ Lyric Search
class ValidLyricRequest(BaseModel):
"""
- **a**: artist
- **s**: song
- **t**: track (artist and song combined) [used only if a & s are not used]
- **extra**: include extra details in response [optional, default: false]
- **lrc**: Request LRCs?
- **sub**: text to search within lyrics, if found lyrics will begin at found verse [optional]
- **src**: the script/utility which initiated the request
- **excluded_sources**: sources to exclude (new only)
Request model for lyric search.
Attributes:
- **a** (Optional[str]): Artist.
- **s** (Optional[str]): Song.
- **t** (Optional[str]): Track (artist and song combined).
- **extra** (Optional[bool]): Include extra details in response.
- **lrc** (Optional[bool]): Request LRCs?
- **sub** (Optional[str]): Text to search within lyrics.
- **src** (str): The script/utility which initiated the request.
- **excluded_sources** (Optional[list]): Sources to exclude.
"""
a: Optional[str] = None
@@ -166,7 +190,10 @@ class ValidLyricRequest(BaseModel):
class ValidTypeAheadRequest(BaseModel):
"""
- **query**: query string
Request model for typeahead query.
Attributes:
- **query** (str): Query string.
"""
query: str
@@ -183,11 +210,15 @@ class RadioException(Exception):
class ValidRadioSongRequest(BaseModel):
"""
- **key**: API Key
- **artist**: artist to search
- **song**: song to search
- **artistsong**: may be used IN PLACE OF artist/song to perform a combined/string search in the format "artist - song"
- **alsoSkip**: Whether to skip immediately to this track [not implemented]
Request model for radio song request.
Attributes:
- **key** (str): API Key.
- **artist** (Optional[str]): Artist to search.
- **song** (Optional[str]): Song to search.
- **artistsong** (Optional[str]): Combined artist-song string.
- **alsoSkip** (Optional[bool]): Whether to skip immediately to this track.
- **station** (Station): Station name.
"""
key: str
@@ -200,7 +231,10 @@ class ValidRadioSongRequest(BaseModel):
class ValidRadioTypeaheadRequest(BaseModel):
"""
- **query**: Typeahead query
Request model for radio typeahead.
Attributes:
- **query** (str): Typeahead query.
"""
query: str
@@ -208,8 +242,11 @@ class ValidRadioTypeaheadRequest(BaseModel):
class ValidRadioQueueGetRequest(BaseModel):
"""
- **key**: API key (optional, needed if specifying a non-default limit)
- **limit**: optional, default: 15k
Request model for radio queue get.
Attributes:
- **key** (Optional[str]): API key.
- **limit** (Optional[int]): Result limit.
"""
key: Optional[str] = None
@@ -218,9 +255,12 @@ class ValidRadioQueueGetRequest(BaseModel):
class ValidRadioNextRequest(BaseModel):
"""
- **key**: API Key
- **skipTo**: UUID to skip to [optional]
- **station**: Station (default: "main")
Request model for radio next track.
Attributes:
- **key** (str): API Key.
- **skipTo** (Optional[str]): UUID to skip to.
- **station** (Station): Station name.
"""
key: str
@@ -230,17 +270,23 @@ class ValidRadioNextRequest(BaseModel):
class ValidRadioReshuffleRequest(ValidRadioNextRequest):
"""
- **key**: API Key
- **station**: Station (default: "main")
Request model for radio reshuffle.
Attributes:
- **key** (str): API Key.
- **station** (Station): Station name.
"""
class ValidRadioQueueRequest(BaseModel):
"""
- **draw**: DataTables draw count, default 1
- **start**: paging start position, default 0
- **search**: Optional search query
- **station**: Station (default: "main")
Request model for radio queue.
Attributes:
- **draw** (Optional[int]): DataTables draw count.
- **start** (Optional[int]): Paging start position.
- **search** (Optional[str]): Search query.
- **station** (Station): Station name.
"""
draw: Optional[int] = 1
@@ -251,10 +297,13 @@ class ValidRadioQueueRequest(BaseModel):
class ValidRadioQueueShiftRequest(BaseModel):
"""
- **key**: API Key
- **uuid**: UUID to shift
- **next**: Play next if true, immediately if false, default False
- **station**: Station (default: "main")
Request model for radio queue shift.
Attributes:
- **key** (str): API Key.
- **uuid** (str): UUID to shift.
- **next** (Optional[bool]): Play next if true.
- **station** (Station): Station name.
"""
key: str
@@ -265,9 +314,12 @@ class ValidRadioQueueShiftRequest(BaseModel):
class ValidRadioQueueRemovalRequest(BaseModel):
"""
- **key**: API Key
- **uuid**: UUID to remove
- **station**: Station (default: "main")
Request model for radio queue removal.
Attributes:
- **key** (str): API Key.
- **uuid** (str): UUID to remove.
- **station** (Station): Station name.
"""
key: str

View File

@@ -17,6 +17,7 @@ class LastFM(FastAPI):
"""Last.FM Endpoints"""
def __init__(self, app: FastAPI, util, constants) -> None:
"""Initialize LastFM endpoints."""
self.app: FastAPI = app
self.util = util
self.constants = constants
@@ -44,8 +45,13 @@ class LastFM(FastAPI):
self, data: ValidArtistSearchRequest
) -> JSONResponse:
"""
Get artist info
- **a**: Artist to search
Get artist information by name.
Parameters:
- **data** (ValidArtistSearchRequest): Request containing artist name.
Returns:
- **JSONResponse**: Contains artist information or an error message.
"""
artist: Optional[str] = data.a.strip()
if not artist:
@@ -81,8 +87,13 @@ class LastFM(FastAPI):
self, data: ValidArtistSearchRequest
) -> JSONResponse:
"""
Get artist's albums/releases
- **a**: Artist to search
Get artist's albums/releases.
Parameters:
- **data** (ValidArtistSearchRequest): Request containing artist name.
Returns:
- **JSONResponse**: Contains a list of albums or an error message.
"""
artist: str = data.a.strip()
if not artist:
@@ -121,9 +132,13 @@ class LastFM(FastAPI):
self, data: ValidAlbumDetailRequest
) -> JSONResponse:
"""
Get details of a particular release by an artist
- **a**: Artist to search
- **release**: Release title to search
Get details of a particular release by an artist.
Parameters:
- **data** (ValidAlbumDetailRequest): Request containing artist and release name.
Returns:
- **JSONResponse**: Release details or error.
"""
artist: str = data.a.strip()
release: str = data.release.strip()
@@ -157,9 +172,13 @@ class LastFM(FastAPI):
self, data: ValidAlbumDetailRequest
) -> JSONResponse:
"""
Get track list for a particular release by an artist
- **a**: Artist to search
- **release**: Release title to search
Get track list for a particular release by an artist.
Parameters:
- **data** (ValidAlbumDetailRequest): Request containing artist and release name.
Returns:
- **JSONResponse**: Track list or error.
"""
artist: str = data.a.strip()
release: str = data.release.strip()
@@ -189,9 +208,13 @@ class LastFM(FastAPI):
async def track_info_handler(self, data: ValidTrackInfoRequest) -> JSONResponse:
"""
Get track info from Last.FM given an artist/track
- **a**: Artist to search
- **t**: Track title to search
Get track info from Last.FM given an artist/track.
Parameters:
- **data** (ValidTrackInfoRequest): Request containing artist and track name.
Returns:
- **JSONResponse**: Track info or error.
"""
try:
artist: str = data.a

View File

@@ -20,12 +20,21 @@ class CacheUtils:
"""
def __init__(self) -> None:
"""Initialize CacheUtils."""
self.lyrics_db_path: LiteralString = os.path.join(
"/usr/local/share", "sqlite_dbs", "cached_lyrics.db"
)
async def check_typeahead(self, query: str) -> Optional[list[str]]:
"""Lyric Search Typeahead DB Handler"""
"""
Check typeahead suggestions for lyric search.
Args:
query: The search query.
Returns:
List of matching artist-song strings, or None if query is empty.
"""
if not query:
return None
async with sqlite3.connect(self.lyrics_db_path, timeout=1) as _db:
@@ -46,6 +55,7 @@ class LyricSearch(FastAPI):
"""
def __init__(self, app: FastAPI, util, constants) -> None:
"""Initialize LyricSearch endpoints."""
self.app: FastAPI = app
self.util = util
self.constants = constants
@@ -92,8 +102,13 @@ class LyricSearch(FastAPI):
async def typeahead_handler(self, data: ValidTypeAheadRequest) -> JSONResponse:
"""
Lyric search typeahead handler
- **query**: Typeahead query
Handle lyric search typeahead requests.
Parameters:
- **data** (ValidTypeAheadRequest): Request containing the query.
Returns:
- **JSONResponse**: Typeahead suggestions or error.
"""
if not isinstance(data.query, str):
return JSONResponse(
@@ -112,15 +127,13 @@ class LyricSearch(FastAPI):
async def lyric_search_handler(self, data: ValidLyricRequest) -> JSONResponse:
"""
Search for lyrics
- **a**: artist
- **s**: song
- **t**: track (artist and song combined) [used only if a & s are not used]
- **extra**: include extra details in response [optional, default: false]
- **lrc**: Request LRCs?
- **sub**: text to search within lyrics, if found lyrics will begin at found verse [optional, default: none]
- **src**: the script/utility which initiated the request
- **excluded_sources**: sources to exclude [optional, default: none]
Search for lyrics.
Parameters:
- **data** (ValidLyricRequest): Request containing artist, song, and other parameters.
Returns:
- **JSONResponse**: Lyrics data or error.
"""
if (not data.a or not data.s) and not data.t or not data.src:
raise HTTPException(detail="Invalid request", status_code=500)

View File

@@ -6,10 +6,11 @@ from utils.meme_util import MemeUtil
class Meme(FastAPI):
"""
Misc Endpoints
Meme Endpoints
"""
def __init__(self, app: FastAPI, my_util, constants) -> None:
"""Initialize Meme endpoints."""
self.app: FastAPI = app
self.util = my_util
self.meme_util = MemeUtil(constants)
@@ -35,21 +36,47 @@ class Meme(FastAPI):
)
async def get_meme_by_id(self, id: int, request: Request) -> Response:
"""Get meme (image) by id"""
"""
Get meme image by ID.
Parameters:
- **id** (int): Meme ID.
- **request** (Request): The request object.
Returns:
- **Response**: Image response or 404.
"""
meme_image = await self.meme_util.get_meme_by_id(id)
if not meme_image:
return Response(status_code=404, content="Not found")
return Response(content=meme_image, media_type="image/png")
async def random_meme(self, request: Request) -> Response:
"""Get random meme (image)"""
"""
Get a random meme image.
Parameters:
- **request** (Request): The request object.
Returns:
- **Response**: Image response or 404.
"""
meme_image = await self.meme_util.get_random_meme()
if not meme_image:
return Response(status_code=404, content="Not found")
return Response(content=meme_image, media_type="image/png")
async def list_memes(self, page: int, request: Request) -> Response:
"""Get meme (image) by id"""
"""
List memes with pagination.
Parameters:
- **page** (int): Page number.
- **request** (Request): The request object.
Returns:
- **JSONResponse**: List of memes with paging info.
"""
meme_list = await self.meme_util.list_memes(page)
page_count = await self.meme_util.get_page_count()
return JSONResponse(

View File

@@ -24,6 +24,7 @@ class Misc(FastAPI):
"""
def __init__(self, app: FastAPI, my_util, constants, radio) -> None:
"""Initialize Misc endpoints."""
self.app: FastAPI = app
self.util = my_util
self.constants = constants
@@ -84,12 +85,29 @@ class Misc(FastAPI):
return "No."
async def no(self) -> JSONResponse:
"""NaaS"""
"""
Get a random 'no' reason.
Returns:
- **JSONResponse**: Contains a random 'no' reason.
"""
return JSONResponse(content={"no": self.get_no()})
async def upload_activity_image(
self, image: UploadFile, key: Annotated[str, Form()], request: Request
) -> Response:
"""
Upload activity image.
Parameters:
- **image** (UploadFile): The uploaded image file.
- **key** (str): The API key for authentication.
- **request** (Request): The HTTP request object.
Returns:
- **Response**: Indicates success or failure of the upload.
"""
if (
not key
or not isinstance(key, str)
@@ -112,6 +130,15 @@ class Misc(FastAPI):
)
async def get_activity_image(self, request: Request) -> Response:
"""
Get the activity image.
Parameters:
- **request** (Request): The HTTP request object.
Returns:
- **Response**: The activity image or a fallback image.
"""
if isinstance(self.activity_image, bytes):
return Response(content=self.activity_image, media_type="image/png")
@@ -125,11 +152,13 @@ class Misc(FastAPI):
async def get_radio_np(self, station: str = "main") -> tuple[str, str, str]:
"""
Get radio now playing
Get radio now playing info.
Args:
None
station: Station name.
Returns:
str: Radio now playing in artist - song format
Tuple of (artistsong, album, genre).
"""
np: dict = self.radio.radio_util.now_playing[station]
@@ -145,7 +174,10 @@ class Misc(FastAPI):
async def homepage_redis_widget(self) -> JSONResponse:
"""
Homepage Redis Widget Handler
Get Redis stats for homepage widget.
Returns:
- **JSONResponse**: Contains Redis stats.
"""
# Measure response time w/ test lyric search
time_start: float = time.time() # Start time for response_time
@@ -169,7 +201,10 @@ class Misc(FastAPI):
async def homepage_rq_widget(self) -> JSONResponse:
"""
Homepage RQ Widget Handler
Get RQ job stats for homepage widget.
Returns:
- **JSONResponse**: Contains RQ job stats.
"""
queue_name = "dls"
queue = Queue(queue_name, self.redis_client)
@@ -193,7 +228,10 @@ class Misc(FastAPI):
async def homepage_sqlite_widget(self) -> JSONResponse:
"""
Homepage SQLite Widget Handler
Get SQLite stats for homepage widget.
Returns:
- **JSONResponse**: Contains SQLite stats.
"""
row_count: int = await self.lyr_cache.sqlite_rowcount()
distinct_artists: int = await self.lyr_cache.sqlite_distinct("artist")
@@ -208,7 +246,10 @@ class Misc(FastAPI):
async def homepage_lyrics_widget(self) -> JSONResponse:
"""
Homepage Lyrics Widget Handler
Get lyrics stats for homepage widget.
Returns:
- **JSONResponse**: Contains lyrics stats.
"""
found_counts: Optional[dict] = await self.redis_cache.get_found_counts()
if not isinstance(found_counts, dict):
@@ -228,7 +269,13 @@ class Misc(FastAPI):
async def homepage_radio_widget(self, station: str = "main") -> JSONResponse:
"""
Homepage Radio Widget Handler
Get radio now playing for homepage widget.
Parameters:
- **station** (str): Station name. Defaults to "main".
Returns:
- **JSONResponse**: Contains now playing info.
"""
radio_np: tuple = await self.get_radio_np(station)
if not radio_np:

View File

@@ -68,10 +68,14 @@ class Radio(FastAPI):
self, data: ValidRadioNextRequest, request: Request
) -> JSONResponse:
"""
Skip to the next track in the queue, or to uuid specified in skipTo if provided
- **key**: API key
- **skipTo**: Optional UUID to skip to
- **station**: default "main"
Skip to the next track in the queue, or to the UUID specified in `skipTo` if provided.
Parameters:
- **data** (ValidRadioNextRequest): Contains the API key, optional UUID to skip to, and station name.
- **request** (Request): The HTTP request object.
Returns:
- **JSONResponse**: Indicates success or failure of the skip operation.
"""
try:
if not self.util.check_key(path=request.url.path, req_type=4, key=data.key):
@@ -114,10 +118,16 @@ class Radio(FastAPI):
self, data: ValidRadioReshuffleRequest, request: Request
) -> JSONResponse:
"""
Reshuffle the play queue
- **key**: API key
- **station**: default "main"
Reshuffle the play queue.
Parameters:
- **data** (ValidRadioReshuffleRequest): Contains the API key and station name.
- **request** (Request): The HTTP request object.
Returns:
- **JSONResponse**: Indicates success of the reshuffle operation.
"""
if not self.util.check_key(path=request.url.path, req_type=4, key=data.key):
raise HTTPException(status_code=403, detail="Unauthorized")
@@ -130,7 +140,14 @@ class Radio(FastAPI):
data: Optional[ValidRadioQueueRequest] = None,
) -> JSONResponse:
"""
Get current play queue (paged, 20 results per page)
Get the current play queue (paged, 20 results per page).
Parameters:
- **request** (Request): The HTTP request object.
- **data** (Optional[ValidRadioQueueRequest]): Contains the station name and optional search query.
Returns:
- **JSONResponse**: Contains the paged queue data.
"""
if not (data and data.station):
return JSONResponse(status_code=500,
@@ -191,13 +208,16 @@ class Radio(FastAPI):
self, data: ValidRadioQueueShiftRequest, request: Request
) -> JSONResponse:
"""
Shift position of a UUID within the queue
[currently limited to playing next or immediately]
- **key**: API key
- **uuid**: UUID to shift
- **next**: Play track next? If False, skips to the track
- **station**: default "main"
Shift the position of a UUID within the queue.
Parameters:
- **data** (ValidRadioQueueShiftRequest): Contains the API key, UUID to shift, and station name.
- **request** (Request): The HTTP request object.
Returns:
- **JSONResponse**: Indicates success of the shift operation.
"""
if not self.util.check_key(path=request.url.path, req_type=4, key=data.key):
raise HTTPException(status_code=403, detail="Unauthorized")
@@ -225,11 +245,16 @@ class Radio(FastAPI):
self, data: ValidRadioQueueRemovalRequest, request: Request
) -> JSONResponse:
"""
Remove an item from the current play queue
- **key**: API key
- **uuid**: UUID of queue item to remove
- **station**: default "main"
Remove an item from the current play queue.
Parameters:
- **data** (ValidRadioQueueRemovalRequest): Contains the API key, UUID of the item to remove, and station name.
- **request** (Request): The HTTP request object.
Returns:
- **JSONResponse**: Indicates success of the removal operation.
"""
if not self.util.check_key(path=request.url.path, req_type=4, key=data.key):
raise HTTPException(status_code=403, detail="Unauthorized")
@@ -254,10 +279,15 @@ class Radio(FastAPI):
station: Station = "main"
) -> Response:
"""
Get album art, optional parameter track_id may be specified.
Otherwise, current track album art will be pulled.
- **track_id**: Optional, if provided, will attempt to retrieve the album art of this track_id. Current track used otherwise.
- **station**: default "main"
Get album art for the current or specified track.
Parameters:
- **request** (Request): The HTTP request object.
- **track_id** (Optional[int]): ID of the track to retrieve album art for. Defaults to the current track.
- **station** (Station): Name of the station. Defaults to "main".
Returns:
- **Response**: Contains the album art image or a default image.
"""
try:
if not track_id:
@@ -288,9 +318,16 @@ class Radio(FastAPI):
async def radio_now_playing(self, request: Request,
station: Station = "main") -> JSONResponse:
"""
Get currently playing track info
- **station**: default "main"
Get information about the currently playing track.
Parameters:
- **request** (Request): The HTTP request object.
- **station** (Station): Name of the station. Defaults to "main".
Returns:
- **JSONResponse**: Contains the track information.
"""
ret_obj: dict = {**self.radio_util.now_playing[station]}
ret_obj["station"] = station
try:
@@ -308,12 +345,17 @@ class Radio(FastAPI):
background_tasks: BackgroundTasks,
) -> JSONResponse:
"""
Get next track
(Track will be removed from the queue in the process.)
- **key**: API key
- **skipTo**: Optional UUID to skip to
- **station**: default: "main"
Get the next track in the queue. The track will be removed from the queue in the process.
Parameters:
- **data** (ValidRadioNextRequest): Contains the API key, optional UUID to skip to, and station name.
- **request** (Request): The HTTP request object.
- **background_tasks** (BackgroundTasks): Background tasks for webhook execution.
Returns:
- **JSONResponse**: Contains the next track information.
"""
logging.info("Radio get next")
if data.station not in self.radio_util.active_playlist.keys():
raise HTTPException(status_code=500, detail="No such station/not ready")
@@ -372,14 +414,16 @@ class Radio(FastAPI):
self, data: ValidRadioSongRequest, request: Request
) -> JSONResponse:
"""
Song request handler
- **key**: API key
- **artist**: Artist to search
- **song**: Song to search
- **artistsong**: Optional "Artist - Song" pair to search, in place of artist/song
- **alsoSkip**: If True, skips to the track; otherwise, track will be placed next up in queue
- **station**: default "main"
Handle song requests.
Parameters:
- **data** (ValidRadioSongRequest): Contains the API key, artist, song, and station name.
- **request** (Request): The HTTP request object.
Returns:
- **JSONResponse**: Indicates success or failure of the request.
"""
if not self.util.check_key(path=request.url.path, req_type=4, key=data.key):
raise HTTPException(status_code=403, detail="Unauthorized")
artistsong: Optional[str] = data.artistsong
@@ -413,8 +457,14 @@ class Radio(FastAPI):
self, data: ValidRadioTypeaheadRequest, request: Request
) -> JSONResponse:
"""
Radio typeahead handler
- **query**: Typeahead query
Handle typeahead queries for the radio.
Parameters:
- **data** (ValidRadioTypeaheadRequest): Contains the typeahead query.
- **request** (Request): The HTTP request object.
Returns:
- **JSONResponse**: Contains the typeahead results.
"""
if not isinstance(data.query, str):
return JSONResponse(

View File

@@ -14,6 +14,7 @@ class RandMsg(FastAPI):
"""
def __init__(self, app: FastAPI, util, constants) -> None:
"""Initialize RandMsg endpoint."""
self.app: FastAPI = app
self.util = util
self.constants = constants
@@ -30,8 +31,13 @@ class RandMsg(FastAPI):
self, data: Optional[RandMsgRequest] = None
) -> JSONResponse:
"""
Get a randomly generated message
- **short**: Optional, if True, will limit length of returned random messages to <=126 characters (Discord restriction related)
Get a randomly generated message.
Parameters:
- **data** (Optional[RandMsgRequest]): Optional request data for short messages.
Returns:
- **JSONResponse**: Contains a random message.
"""
random.seed()
short: Optional[bool] = False

View File

@@ -33,6 +33,7 @@ class RIP(FastAPI):
"""
def __init__(self, app: FastAPI, my_util, constants) -> None:
"""Initialize RIP endpoints."""
self.app: FastAPI = app
self.util = my_util
self.trip_util = SRUtil()
@@ -72,7 +73,15 @@ class RIP(FastAPI):
)
def _format_job(self, job: Job):
"""Helper to normalize job data into JSON."""
"""
Helper to normalize job data into JSON.
Parameters:
- job (Job): The job object to format.
Returns:
- dict: Contains normalized job data.
"""
job_status: str | JobStatus = job.get_status()
progress = job.meta.get("progress", 0)
if progress == 100 and not job.meta.get("tarball"):
@@ -82,7 +91,13 @@ class RIP(FastAPI):
tracks_out = len(job.meta.get("tracks", []))
# `utils/rip_background.py` sets per-track status to 'Success' or 'Failed'
# so check for 'success' case-insensitively and count matches.
succeeded_tracks = len([t for t in job.meta.get("tracks", []) if str(t.get("status", "")).lower() == "success"])
succeeded_tracks = len(
[
t
for t in job.meta.get("tracks", [])
if str(t.get("status", "")).lower() == "success"
]
)
return {
"id": job.id,
@@ -103,10 +118,20 @@ class RIP(FastAPI):
async def artists_by_name_handler(
self, artist: str, request: Request, user=Depends(get_current_user)
) -> Response:
"""Get artists by name"""
"""
Get artists by name.
Parameters:
- **artist** (str): Artist name.
- **request** (Request): The request object.
- **user**: Current user (dependency).
Returns:
- **Response**: JSON response with artists or 404.
"""
# support optional grouping to return one primary per display name
# with `alternatives` for disambiguation (use ?group=true)
group = bool(request.query_params.get("group", False))
group = bool(request.query_params.get("group", False))
artists = await self.trip_util.get_artists_by_name(artist, group=group)
if not artists:
return Response(status_code=404, content="Not found")
@@ -115,7 +140,17 @@ class RIP(FastAPI):
async def albums_by_artist_id_handler(
self, artist_id: int, request: Request, user=Depends(get_current_user)
) -> Response:
"""Get albums by artist ID"""
"""
Get albums by artist ID.
Parameters:
- **artist_id** (int): Artist ID.
- **request** (Request): The request object.
- **user**: Current user (dependency).
Returns:
- **Response**: JSON response with albums or 404.
"""
albums = await self.trip_util.get_albums_by_artist_id(artist_id)
if not albums:
return Response(status_code=404, content="Not found")
@@ -128,7 +163,18 @@ class RIP(FastAPI):
user=Depends(get_current_user),
quality: Literal["FLAC", "Lossy"] = "FLAC",
) -> Response:
"""Get tracks by album id"""
"""
Get tracks by album ID.
Parameters:
- **album_id** (int): Album ID.
- **request** (Request): The request object.
- **quality** (Literal): Track quality ("FLAC" or "Lossy").
- **user**: Current user (dependency).
Returns:
- **Response**: JSON response with tracks or 404.
"""
tracks = await self.trip_util.get_tracks_by_album_id(album_id, quality)
if not tracks:
return Response(status_code=404, content="Not Found")
@@ -137,7 +183,18 @@ class RIP(FastAPI):
async def tracks_by_artist_song_handler(
self, artist: str, song: str, request: Request, user=Depends(get_current_user)
) -> Response:
"""Get tracks by artist and song name"""
"""
Get tracks by artist and song name.
Parameters:
- **artist** (str): Artist name.
- **song** (str): Song name.
- **request** (Request): The request object.
- **user**: Current user (dependency).
Returns:
- **Response**: JSON response with tracks or 404.
"""
logging.critical("Searching for tracks by artist: %s, song: %s", artist, song)
tracks = await self.trip_util.get_tracks_by_artist_song(artist, song)
if not tracks:
@@ -151,7 +208,18 @@ class RIP(FastAPI):
quality: Literal["FLAC", "Lossy"] = "FLAC",
user=Depends(get_current_user),
) -> Response:
"""Get track by ID"""
"""
Get track by ID.
Parameters:
- **track_id** (int): Track ID.
- **request** (Request): The request object.
- **quality** (Literal): Track quality ("FLAC" or "Lossy").
- **user**: Current user (dependency).
Returns:
- **Response**: JSON response with stream URL or 404.
"""
track = await self.trip_util.get_stream_url_by_track_id(track_id, quality)
if not track:
return Response(status_code=404, content="Not found")
@@ -163,7 +231,17 @@ class RIP(FastAPI):
request: Request,
user=Depends(get_current_user),
) -> Response:
"""Bulk fetch a list of track IDs"""
"""
Bulk fetch a list of track IDs.
Parameters:
- **data** (ValidBulkFetchRequest): Bulk fetch request data.
- **request** (Request): The request object.
- **user**: Current user (dependency).
Returns:
- **Response**: JSON response with job info or error.
"""
if not data or not data.track_ids or not data.target:
return JSONResponse(
content={
@@ -204,7 +282,17 @@ class RIP(FastAPI):
async def job_status_handler(
self, job_id: str, request: Request, user=Depends(get_current_user)
):
"""Get status and result of a single job"""
"""
Get status and result of a single job.
Parameters:
- **job_id** (str): Job ID.
- **request** (Request): The request object.
- **user**: Current user (dependency).
Returns:
- **JSONResponse**: Job status and result or error.
"""
job = None
try:
@@ -233,7 +321,16 @@ class RIP(FastAPI):
return self._format_job(job)
async def job_list_handler(self, request: Request, user=Depends(get_current_user)):
"""List all jobs across all registries (queued, started, finished, failed, etc)."""
"""
List all jobs across all registries.
Parameters:
- **request** (Request): The request object.
- **user**: Current user (dependency).
Returns:
- **JSONResponse**: List of jobs.
"""
jobs_info = []
seen = set()

View File

@@ -13,6 +13,7 @@ class Transcriptions(FastAPI):
"""
def __init__(self, app: FastAPI, util, constants) -> None:
"""Initialize Transcriptions endpoints."""
self.app: FastAPI = app
self.util = util
self.constants = constants
@@ -36,8 +37,13 @@ class Transcriptions(FastAPI):
self, data: ValidShowEpisodeListRequest
) -> JSONResponse:
"""
Get list of episodes by show id
- **s**: Show ID to query
Get list of episodes by show ID.
Parameters:
- **data** (ValidShowEpisodeListRequest): Request containing show ID.
Returns:
- **JSONResponse**: Contains a list of episodes.
"""
show_id: int = data.s
db_path: Optional[Union[str, LiteralString]] = None
@@ -106,9 +112,13 @@ class Transcriptions(FastAPI):
self, data: ValidShowEpisodeLineRequest
) -> Response:
"""
Get lines for a particular episode
- **s**: Show ID to query
- **e**: Episode ID to query
Get lines for a particular episode.
Parameters:
- **data** (ValidShowEpisodeLineRequest): Request containing show and episode ID.
Returns:
- **Response**: Episode lines.
"""
show_id: int = int(data.s)
episode_id: int = int(data.e)

View File

@@ -12,6 +12,7 @@ class YT(FastAPI):
"""
def __init__(self, app: FastAPI, util, constants) -> None:
"""Initialize YT endpoints."""
self.app: FastAPI = app
self.util = util
self.constants = constants
@@ -31,7 +32,15 @@ class YT(FastAPI):
)
async def yt_video_search_handler(self, data: ValidYTSearchRequest) -> JSONResponse:
"""Search for YT Video by Title (closest match returned)"""
"""
Search for YouTube video by title.
Parameters:
- **data** (ValidYTSearchRequest): Request containing title.
Returns:
- **JSONResponse**: Contains video ID or an error message.
"""
title: str = data.t
yts_res: Optional[list[dict]] = await self.ytsearch.search(title)

File diff suppressed because it is too large Load Diff

View File

@@ -1,6 +1,7 @@
import asyncio
import logging
import sys
sys.path.insert(0, "..")
from utils.sr_wrapper import SRUtil
@@ -8,11 +9,16 @@ from utils.sr_wrapper import SRUtil
logger = logging.getLogger()
logger.setLevel(logging.CRITICAL)
async def main():
sr = SRUtil()
artist_search = await sr.get_artists_by_name("Ren")
# logging.critical("Artist search: %s", artist_search)
res = [dict(x) for x in artist_search if x.get('popularity', 0) and x.get('artist').lower() == 'ren']
res = [
dict(x)
for x in artist_search
if x.get("popularity", 0) and x.get("artist").lower() == "ren"
]
logging.critical("Results: %s", res)
# search_res = await sr.get_album_by_name(artist[:8], album)
# logging.critical("Search result: %s", search_res)
@@ -23,4 +29,4 @@ async def main():
return
asyncio.run(main())
asyncio.run(main())

30
util.py
View File

@@ -18,23 +18,45 @@ class Utilities:
def get_blocked_response(self, path: Optional[str] = None):
"""
Get Blocked HTTP Response
Return a redirect response for blocked requests.
Args:
path (Optional[str]): The requested path (currently unused).
Returns:
RedirectResponse: A redirect to the blocked URI.
"""
logging.error("Rejected request: Blocked")
return RedirectResponse(url=self.blocked_redirect_uri)
def get_no_endpoint_found(self, path: Optional[str] = None):
"""
Get 404 Response
Raise an HTTP 404 exception for unknown endpoints.
Args:
path (Optional[str]): The requested path (currently unused).
Raises:
HTTPException: With status code 404 and detail "Unknown endpoint".
"""
logging.error("Rejected request: No such endpoint")
raise HTTPException(detail="Unknown endpoint", status_code=404)
def check_key(self, path: str, key: str, req_type: int = 0) -> bool:
"""
Accepts path as an argument to allow fine tuning access for each API key, not currently in use.
"""
Check if the provided API key is valid and meets the requirements.
Args:
path (str): The request path (reserved for future fine-tuning).
key (str): The authorization header value, expected to start with "Bearer ".
req_type (int): The type of access required.
0: Basic access.
2: Private access (key must start with "PRV-").
4: Radio access (key must start with "RAD-").
Returns:
bool: True if the key is valid and meets the requirements, False otherwise.
"""
if not key or not key.startswith("Bearer "):
return False

View File

@@ -223,6 +223,7 @@ class RadioUtil:
"artist": double_space.sub(" ", result["artist"].strip()),
"song": double_space.sub(" ", result["song"].strip()),
"artistsong": result["artistsong"].strip(),
"album": result["album"].strip() if result["album"] else "N/A",
"genre": self.get_genre(
double_space.sub(" ", result["artist"].strip())
),

View File

@@ -47,7 +47,13 @@ sr = SRUtil()
# ---------- Discord helper ----------
async def discord_notify(webhook_url: str, title: str, description: str, target: Optional[str] = None, color: int = 0x00FF00):
async def discord_notify(
webhook_url: str,
title: str,
description: str,
target: Optional[str] = None,
color: int = 0x00FF00,
):
embed = {
"title": title,
"description": description[:1900] if description else "",
@@ -64,15 +70,20 @@ async def discord_notify(webhook_url: str, title: str, description: str, target:
while True: # permanent retry
try:
async with aiohttp.ClientSession() as session:
async with session.post(webhook_url, json=payload, timeout=aiohttp.ClientTimeout(total=10)) as resp:
async with session.post(
webhook_url, json=payload, timeout=aiohttp.ClientTimeout(total=10)
) as resp:
if resp.status >= 400:
text = await resp.text()
raise RuntimeError(f"Discord webhook failed ({resp.status}): {text}")
raise RuntimeError(
f"Discord webhook failed ({resp.status}): {text}"
)
break
except Exception as e:
print(f"Discord send failed, retrying: {e}")
await asyncio.sleep(5)
def send_log_to_discord(message: str, level: str, target: Optional[str] = None):
colors = {"WARNING": 0xFFA500, "ERROR": 0xFF0000, "CRITICAL": 0xFF0000}
color = colors.get(level.upper(), 0xFFFF00)
@@ -83,7 +94,7 @@ def send_log_to_discord(message: str, level: str, target: Optional[str] = None):
title=f"{level} in bulk_download",
description=message,
target=target,
color=color
color=color,
)
try:
@@ -98,6 +109,7 @@ def send_log_to_discord(message: str, level: str, target: Optional[str] = None):
# ---------- Helpers ----------
def tag_with_mediafile(file_path: str, meta: dict):
f = MediaFile(file_path)
def safe_set(attr, value, default=None, cast=None):
if value is None:
value = default
@@ -106,6 +118,7 @@ def tag_with_mediafile(file_path: str, meta: dict):
setattr(f, attr, cast(value))
else:
setattr(f, attr, str(value))
safe_set("title", meta.get("title"), default="Unknown Title")
safe_set("artist", meta.get("artist"), default="Unknown Artist")
safe_set("albumartist", meta.get("album_artist"), default="Unknown Artist")
@@ -136,6 +149,7 @@ def tag_with_mediafile(file_path: str, meta: dict):
if not cover_bytes and cover_url:
try:
import requests
resp = requests.get(cover_url, timeout=10)
resp.raise_for_status()
cover_bytes = resp.content
@@ -188,7 +202,7 @@ def ensure_unique_filename_in_dir(parent: Path, filename: str) -> Path:
# special-case .tar.gz
if filename.lower().endswith(".tar.gz"):
ext = ".tar.gz"
base = filename[:-len(ext)]
base = filename[: -len(ext)]
else:
p = Path(filename)
ext = p.suffix
@@ -235,13 +249,15 @@ def bulk_download(track_list: list, quality: str = "FLAC"):
send_log_to_discord(f"Failed to init job.meta: {e}", "WARNING", target)
# Job started Discord message
asyncio.run(discord_notify(
DISCORD_WEBHOOK,
title=f"Job Started: {job_id}",
description=f"Processing `{len(track_list)}` track(s)",
target=target,
color=0x00FFFF
))
asyncio.run(
discord_notify(
DISCORD_WEBHOOK,
title=f"Job Started: {job_id}",
description=f"Processing `{len(track_list)}` track(s)",
target=target,
color=0x00FFFF,
)
)
async def process_tracks():
per_track_meta = []
@@ -253,7 +269,11 @@ def bulk_download(track_list: list, quality: str = "FLAC"):
# Set up a one-time rate-limit callback to notify on the first 429 seen by SRUtil
async def _rate_limit_notify(exc: Exception):
try:
send_log_to_discord(f"Rate limit observed while fetching metadata: {exc}", "WARNING", target)
send_log_to_discord(
f"Rate limit observed while fetching metadata: {exc}",
"WARNING",
target,
)
except Exception:
pass
@@ -265,7 +285,13 @@ def bulk_download(track_list: list, quality: str = "FLAC"):
pass
total = len(track_list or [])
for i, track_id in enumerate(track_list or []):
track_info = {"track_id": str(track_id), "status": "Pending", "file_path": None, "error": None, "attempts": 0}
track_info = {
"track_id": str(track_id),
"status": "Pending",
"file_path": None,
"error": None,
"attempts": 0,
}
attempt = 0
while attempt < MAX_RETRIES:
@@ -326,13 +352,19 @@ def bulk_download(track_list: list, quality: str = "FLAC"):
# Try to fetch cover art via SRUtil (use album_id from metadata)
try:
album_field = md.get("album")
album_id = md.get("album_id") or (album_field.get("id") if isinstance(album_field, dict) else None)
album_id = md.get("album_id") or (
album_field.get("id")
if isinstance(album_field, dict)
else None
)
except Exception:
album_id = None
if album_id:
try:
cover_url = await sr.get_cover_by_album_id(album_id, size=640)
cover_url = await sr.get_cover_by_album_id(
album_id, size=640
)
except Exception:
cover_url = None
else:
@@ -344,7 +376,9 @@ def bulk_download(track_list: list, quality: str = "FLAC"):
if cover_url:
try:
timeout = aiohttp.ClientTimeout(total=15)
async with session.get(cover_url, timeout=timeout) as img_resp:
async with session.get(
cover_url, timeout=timeout
) as img_resp:
if img_resp.status == 200:
img_bytes = await img_resp.read()
else:
@@ -375,23 +409,24 @@ def bulk_download(track_list: list, quality: str = "FLAC"):
# Prefer music_tag if available (keeps compatibility with add_cover_art.py)
try:
from music_tag import load_file as mt_load_file # type: ignore
try:
mf = mt_load_file(str(final_file))
# set basic tags
if md.get('title'):
mf['title'] = md.get('title')
if md.get('artist'):
mf['artist'] = md.get('artist')
if md.get('album'):
mf['album'] = md.get('album')
tracknum = md.get('track_number')
if md.get("title"):
mf["title"] = md.get("title")
if md.get("artist"):
mf["artist"] = md.get("artist")
if md.get("album"):
mf["album"] = md.get("album")
tracknum = md.get("track_number")
if tracknum is not None:
try:
mf['tracknumber'] = int(tracknum)
mf["tracknumber"] = int(tracknum)
except Exception:
pass
if img_bytes:
mf['artwork'] = img_bytes
mf["artwork"] = img_bytes
mf.save()
embedded = True
except Exception:
@@ -438,7 +473,9 @@ def bulk_download(track_list: list, quality: str = "FLAC"):
wait_time = min(60, 2**attempt)
await asyncio.sleep(wait_time)
else:
await asyncio.sleep(random.uniform(THROTTLE_MIN, THROTTLE_MAX))
await asyncio.sleep(
random.uniform(THROTTLE_MIN, THROTTLE_MAX)
)
except Exception as e:
tb = traceback.format_exc()
@@ -447,7 +484,11 @@ def bulk_download(track_list: list, quality: str = "FLAC"):
track_info["error"] = str(e)
if attempt >= MAX_RETRIES:
track_info["status"] = "Failed"
send_log_to_discord(f"Track {track_id} failed after {attempt} attempts", "ERROR", target)
send_log_to_discord(
f"Track {track_id} failed after {attempt} attempts",
"ERROR",
target,
)
await asyncio.sleep(random.uniform(THROTTLE_MIN, THROTTLE_MAX))
finally:
@@ -464,7 +505,11 @@ def bulk_download(track_list: list, quality: str = "FLAC"):
job.meta["tarball"] = None
job.meta["status"] = "Failed"
job.save_meta()
send_log_to_discord(f"No tracks were successfully downloaded for job `{job_id}`", "CRITICAL", target)
send_log_to_discord(
f"No tracks were successfully downloaded for job `{job_id}`",
"CRITICAL",
target,
)
return []
# Tarball creation
@@ -476,7 +521,11 @@ def bulk_download(track_list: list, quality: str = "FLAC"):
except Exception:
artist = "Unknown Artist"
artist_counts[artist] = artist_counts.get(artist, 0) + 1
top_artist = sorted(artist_counts.items(), key=lambda kv: (-kv[1], kv[0]))[0][0] if artist_counts else "Unknown Artist"
top_artist = (
sorted(artist_counts.items(), key=lambda kv: (-kv[1], kv[0]))[0][0]
if artist_counts
else "Unknown Artist"
)
# Prefer `job.meta['target']` when provided by the enqueuer. Fall back to the top artist.
target_name = None
try:
@@ -485,7 +534,11 @@ def bulk_download(track_list: list, quality: str = "FLAC"):
except Exception:
target_name = None
base_label = sanitize_filename(target_name) if target_name else sanitize_filename(top_artist)
base_label = (
sanitize_filename(target_name)
if target_name
else sanitize_filename(top_artist)
)
staged_tarball = staging_root / f"{base_label}.tar.gz"
counter = 1
@@ -504,14 +557,24 @@ def bulk_download(track_list: list, quality: str = "FLAC"):
job.save_meta()
logging.info("Creating tarball: %s", staged_tarball)
await discord_notify(DISCORD_WEBHOOK,
title=f"Compressing: Job {job_id}",
description=f"Creating tarball: `{len(all_final_files)}` track(s).\nStaging path: {staged_tarball}",
color=0xFFA500,
target=target)
await discord_notify(
DISCORD_WEBHOOK,
title=f"Compressing: Job {job_id}",
description=f"Creating tarball: `{len(all_final_files)}` track(s).\nStaging path: {staged_tarball}",
color=0xFFA500,
target=target,
)
try:
subprocess.run(
["tar", "-I", "pigz -9", "-cf", str(staged_tarball), "-C", str(staging_root)]
[
"tar",
"-I",
"pigz -9",
"-cf",
str(staged_tarball),
"-C",
str(staging_root),
]
+ [str(f.relative_to(staging_root)) for f in all_final_files],
check=True,
)
@@ -521,7 +584,11 @@ def bulk_download(track_list: list, quality: str = "FLAC"):
except Exception:
pass
except FileNotFoundError:
send_log_to_discord("pigz not available, falling back to tarfile (slower).", "WARNING", target)
send_log_to_discord(
"pigz not available, falling back to tarfile (slower).",
"WARNING",
target,
)
with tarfile.open(staged_tarball, "w:gz") as tar:
for f in all_final_files:
try:
@@ -535,7 +602,9 @@ def bulk_download(track_list: list, quality: str = "FLAC"):
pass
if not staged_tarball.exists():
send_log_to_discord(f"Tarball was not created: `{staged_tarball}`", "CRITICAL", target)
send_log_to_discord(
f"Tarball was not created: `{staged_tarball}`", "CRITICAL", target
)
if job:
job.meta["status"] = "compress_failed"
job.save_meta()
@@ -556,13 +625,13 @@ def bulk_download(track_list: list, quality: str = "FLAC"):
# Job completed Discord message
completed = len(all_final_files)
failed = (len(track_list) - completed)
failed = len(track_list) - completed
await discord_notify(
DISCORD_WEBHOOK,
title=f"Job Completed: {job_id}",
description=f"Processed `{len(track_list)}` track(s).\nCompleted: `{completed}`\nFailed: `{failed}`\nTarball: `{final_tarball}`",
target=target,
color=0x00FF00
color=0x00FF00,
)
return [str(final_tarball)]
@@ -572,7 +641,9 @@ def bulk_download(track_list: list, quality: str = "FLAC"):
try:
return loop.run_until_complete(process_tracks())
except Exception as e:
send_log_to_discord(f"bulk_download failed: {e}\n{traceback.format_exc()}", "CRITICAL", target)
send_log_to_discord(
f"bulk_download failed: {e}\n{traceback.format_exc()}", "CRITICAL", target
)
if job:
job.meta["status"] = "Failed"
job.save_meta()

View File

@@ -30,7 +30,6 @@ for name in [__name__, "utils.sr_wrapper"]:
logging.getLogger().setLevel(logging.CRITICAL)
load_dotenv()
@@ -66,7 +65,9 @@ class SRUtil:
self.streamrip_client = TidalClient(self.streamrip_config)
self.MAX_CONCURRENT_METADATA_REQUESTS = 2
self.METADATA_RATE_LIMIT = 1.25
self.METADATA_SEMAPHORE = asyncio.Semaphore(self.MAX_CONCURRENT_METADATA_REQUESTS)
self.METADATA_SEMAPHORE = asyncio.Semaphore(
self.MAX_CONCURRENT_METADATA_REQUESTS
)
self.LAST_METADATA_REQUEST = 0
self.MAX_METADATA_RETRIES = 5
self.METADATA_ALBUM_CACHE: dict[str, dict] = {}
@@ -77,16 +78,18 @@ class SRUtil:
self._rate_limit_notified = False
async def rate_limited_request(self, func, *args, **kwargs):
async with self.METADATA_SEMAPHORE:
now = time.time()
elapsed = now - self.LAST_METADATA_REQUEST
if elapsed < self.METADATA_RATE_LIMIT:
await asyncio.sleep(self.METADATA_RATE_LIMIT - elapsed)
result = await func(*args, **kwargs)
self.LAST_METADATA_REQUEST = time.time()
return result
async with self.METADATA_SEMAPHORE:
now = time.time()
elapsed = now - self.LAST_METADATA_REQUEST
if elapsed < self.METADATA_RATE_LIMIT:
await asyncio.sleep(self.METADATA_RATE_LIMIT - elapsed)
result = await func(*args, **kwargs)
self.LAST_METADATA_REQUEST = time.time()
return result
async def _safe_api_call(self, func, *args, retries: int = 2, backoff: float = 0.5, **kwargs):
async def _safe_api_call(
self, func, *args, retries: int = 2, backoff: float = 0.5, **kwargs
):
"""Call an async API function with resilient retry behavior.
- On AttributeError: attempt a `login()` once and retry.
@@ -116,7 +119,11 @@ class SRUtil:
if ("400" in msg or "429" in msg) and attempt < retries - 1:
# Notify on the first observed 429 (if a callback is set)
try:
if "429" in msg and not self._rate_limit_notified and self.on_rate_limit:
if (
"429" in msg
and not self._rate_limit_notified
and self.on_rate_limit
):
self._rate_limit_notified = True
try:
if asyncio.iscoroutinefunction(self.on_rate_limit):
@@ -128,17 +135,29 @@ class SRUtil:
pass
except Exception:
pass
await asyncio.sleep(backoff * (2 ** attempt))
await asyncio.sleep(backoff * (2**attempt))
continue
# Connection related errors — try to re-login then retry
if isinstance(e, (aiohttp.ClientError, OSError, ConnectionError, asyncio.TimeoutError)) or "Connection" in msg or "closed" in msg.lower():
if (
isinstance(
e,
(
aiohttp.ClientError,
OSError,
ConnectionError,
asyncio.TimeoutError,
),
)
or "Connection" in msg
or "closed" in msg.lower()
):
try:
await self.streamrip_client.login()
except Exception:
pass
if attempt < retries - 1:
await asyncio.sleep(backoff * (2 ** attempt))
await asyncio.sleep(backoff * (2**attempt))
continue
# Unhandled / permanent error: re-raise after loop ends
@@ -151,10 +170,23 @@ class SRUtil:
if not expected or not actual:
return False
return fuzz.token_set_ratio(expected.lower(), actual.lower()) >= threshold
def is_metadata_match(self, expected_artist, expected_album, expected_title, found_artist, found_album, found_title, threshold=80):
def is_metadata_match(
self,
expected_artist,
expected_album,
expected_title,
found_artist,
found_album,
found_title,
threshold=80,
):
artist_match = self.is_fuzzy_match(expected_artist, found_artist, threshold)
album_match = self.is_fuzzy_match(expected_album, found_album, threshold) if expected_album else True
album_match = (
self.is_fuzzy_match(expected_album, found_album, threshold)
if expected_album
else True
)
title_match = self.is_fuzzy_match(expected_title, found_title, threshold)
return artist_match and album_match and title_match
@@ -166,7 +198,9 @@ class SRUtil:
deduped[norm] = entry
return list(deduped.values())
def group_artists_by_name(self, entries: list[dict], query: Optional[str] = None) -> list[dict]:
def group_artists_by_name(
self, entries: list[dict], query: Optional[str] = None
) -> list[dict]:
"""
Group artist entries by normalized display name and pick a primary candidate per name.
@@ -199,10 +233,15 @@ class SRUtil:
score = 0.0
if query:
try:
if it.get("artist", "").strip().lower() == query.strip().lower():
if (
it.get("artist", "").strip().lower()
== query.strip().lower()
):
score += 1000.0
else:
score += float(fuzz.token_set_ratio(query, it.get("artist", "")))
score += float(
fuzz.token_set_ratio(query, it.get("artist", ""))
)
except Exception:
score += 0.0
# add small weight for popularity if present
@@ -216,12 +255,14 @@ class SRUtil:
primary = scored[0][1]
alternatives = [it for _, it in scored[1:]]
out.append({
"artist": primary.get("artist"),
"id": primary.get("id"),
"popularity": primary.get("popularity"),
"alternatives": alternatives,
})
out.append(
{
"artist": primary.get("artist"),
"id": primary.get("id"),
"popularity": primary.get("popularity"),
"alternatives": alternatives,
}
)
return out
@@ -230,14 +271,16 @@ class SRUtil:
return None
m, s = divmod(seconds, 60)
return f"{m}:{s:02}"
def _get_tidal_cover_url(self, uuid, size):
"""Generate a tidal cover url.
:param uuid: VALID uuid string
:param size:
"""
TIDAL_COVER_URL = "https://resources.tidal.com/images/{uuid}/{width}x{height}.jpg"
TIDAL_COVER_URL = (
"https://resources.tidal.com/images/{uuid}/{width}x{height}.jpg"
)
possibles = (80, 160, 320, 640, 1280)
assert size in possibles, f"size must be in {possibles}"
return TIDAL_COVER_URL.format(
@@ -246,8 +289,6 @@ class SRUtil:
width=size,
)
def combine_album_track_metadata(
self, album_json: dict | None, track_json: dict
) -> dict:
@@ -288,10 +329,14 @@ class SRUtil:
"peak": track_json.get("peak"),
"lyrics": track_json.get("lyrics"),
"track_copyright": track_json.get("copyright"),
"cover_id": track_json.get("album", {}).get("cover") or album_json.get("cover"),
"cover_id": track_json.get("album", {}).get("cover")
or album_json.get("cover"),
"cover_url": (
f"https://resources.tidal.com/images/{track_json.get('album', {}).get('cover', album_json.get('cover'))}/1280x1280.jpg"
if (track_json.get("album", {}).get("cover") or album_json.get("cover"))
if (
track_json.get("album", {}).get("cover")
or album_json.get("cover")
)
else None
),
}
@@ -299,7 +344,6 @@ class SRUtil:
return combined
def combine_album_with_all_tracks(
self, album_json: dict[str, Any]
) -> list[dict[str, Any]]:
@@ -309,7 +353,9 @@ class SRUtil:
for t in album_json.get("tracks", [])
]
async def get_artists_by_name(self, artist_name: str, group: bool = False) -> Optional[list]:
async def get_artists_by_name(
self, artist_name: str, group: bool = False
) -> Optional[list]:
"""Get artist(s) by name.
Args:
@@ -324,7 +370,12 @@ class SRUtil:
delay = 1.0
for attempt in range(max_retries):
try:
artists = await self._safe_api_call(self.streamrip_client.search, media_type="artist", query=artist_name, retries=3)
artists = await self._safe_api_call(
self.streamrip_client.search,
media_type="artist",
query=artist_name,
retries=3,
)
break
except Exception as e:
msg = str(e)
@@ -344,7 +395,9 @@ class SRUtil:
artists_page = artists[0] if len(artists) > 0 else {}
else:
artists_page = artists
artists_items = artists_page.get("items", []) if isinstance(artists_page, dict) else []
artists_items = (
artists_page.get("items", []) if isinstance(artists_page, dict) else []
)
if not artists_items:
return None
artists_out = [
@@ -365,13 +418,19 @@ class SRUtil:
async def get_albums_by_artist_id(self, artist_id: int) -> Optional[list | dict]:
"""Get albums by artist ID. Retry login only on authentication failure. Rate limit and retry on 400/429."""
import asyncio
artist_id_str: str = str(artist_id)
albums_out: list[dict] = []
max_retries = 4
delay = 1.0
for attempt in range(max_retries):
try:
metadata = await self._safe_api_call(self.streamrip_client.get_metadata, artist_id_str, "artist", retries=3)
metadata = await self._safe_api_call(
self.streamrip_client.get_metadata,
artist_id_str,
"artist",
retries=3,
)
break
except Exception as e:
msg = str(e)
@@ -397,10 +456,10 @@ class SRUtil:
if "title" in album and "id" in album and "artists" in album
]
return albums_out
async def get_album_by_name(self, artist: str, album: str) -> Optional[dict]:
"""Get album by artist and album name using artist ID and fuzzy matching. Try first 8 chars, then 12 if no match. Notify on success."""
# Notification moved to add_cover_art.py as requested
# Notification moved to add_cover_art.py as requested
for trunc in (8, 12):
search_artist = artist[:trunc]
artists = await self.get_artists_by_name(search_artist)
@@ -429,15 +488,22 @@ class SRUtil:
if best_album and best_album_score >= 85:
return best_album
return None
async def get_cover_by_album_id(self, album_id: int, size: int = 640) -> Optional[str]:
async def get_cover_by_album_id(
self, album_id: int, size: int = 640
) -> Optional[str]:
"""Get cover URL by album ID. Retry login only on authentication failure."""
if size not in [80, 160, 320, 640, 1280]:
return None
album_id_str: str = str(album_id)
for attempt in range(2):
try:
metadata = await self._safe_api_call(self.streamrip_client.get_metadata, item_id=album_id_str, media_type="album", retries=2)
metadata = await self._safe_api_call(
self.streamrip_client.get_metadata,
item_id=album_id_str,
media_type="album",
retries=2,
)
break
except Exception:
if attempt == 1:
@@ -452,7 +518,6 @@ class SRUtil:
cover_url = self._get_tidal_cover_url(cover_id, size)
return cover_url
async def get_tracks_by_album_id(
self, album_id: int, quality: str = "FLAC"
) -> Optional[list | dict]:
@@ -464,7 +529,12 @@ class SRUtil:
"""
album_id_str = str(album_id)
try:
metadata = await self._safe_api_call(self.streamrip_client.get_metadata, item_id=album_id_str, media_type="album", retries=2)
metadata = await self._safe_api_call(
self.streamrip_client.get_metadata,
item_id=album_id_str,
media_type="album",
retries=2,
)
except Exception as e:
logging.warning("get_tracks_by_album_id failed: %s", e)
return None
@@ -486,7 +556,9 @@ class SRUtil:
]
return tracks_out
async def get_tracks_by_artist_song(self, artist: str, song: str, n: int = 0) -> Optional[list]:
async def get_tracks_by_artist_song(
self, artist: str, song: str, n: int = 0
) -> Optional[list]:
"""Get track by artist and song name
Args:
artist (str): The name of the artist.
@@ -496,9 +568,18 @@ class SRUtil:
TODO: Reimplement using StreamRip
"""
try:
search_res = await self._safe_api_call(self.streamrip_client.search, media_type="track", query=f"{artist} - {song}", retries=3)
search_res = await self._safe_api_call(
self.streamrip_client.search,
media_type="track",
query=f"{artist} - {song}",
retries=3,
)
logging.critical("Result: %s", search_res)
return search_res[0].get('items') if search_res and isinstance(search_res, list) else []
return (
search_res[0].get("items")
if search_res and isinstance(search_res, list)
else []
)
except Exception as e:
traceback.print_exc()
logging.critical("Search Exception: %s", str(e))
@@ -529,10 +610,15 @@ class SRUtil:
quality_int = 1
track_id_str: str = str(track_id)
# Ensure client is logged in via safe call when needed inside _safe_api_call
# Ensure client is logged in via safe call when needed inside _safe_api_call
try:
logging.critical("Using quality_int: %s", quality_int)
track = await self._safe_api_call(self.streamrip_client.get_downloadable, track_id=track_id_str, quality=quality_int, retries=3)
track = await self._safe_api_call(
self.streamrip_client.get_downloadable,
track_id=track_id_str,
quality=quality_int,
retries=3,
)
except Exception as e:
logging.warning("get_stream_url_by_track_id failed: %s", e)
return None
@@ -557,7 +643,7 @@ class SRUtil:
metadata = await self.rate_limited_request(
self.streamrip_client.get_metadata, str(track_id), "track"
)
album_id = metadata.get("album", {}).get("id")
album_metadata = None
@@ -567,7 +653,11 @@ class SRUtil:
album_metadata = self.METADATA_ALBUM_CACHE[album_id]
else:
album_metadata = await self.rate_limited_request(
lambda i, t: self._safe_api_call(self.streamrip_client.get_metadata, i, t, retries=2), album_id, "album"
lambda i, t: self._safe_api_call(
self.streamrip_client.get_metadata, i, t, retries=2
),
album_id,
"album",
)
if not album_metadata:
return None
@@ -611,11 +701,12 @@ class SRUtil:
self.MAX_METADATA_RETRIES,
)
# Raise a specific exception so callers can react (e.g. notify)
raise MetadataFetchError(f"Metadata fetch failed permanently for track {track_id} after {self.MAX_METADATA_RETRIES} attempts: {e}")
raise MetadataFetchError(
f"Metadata fetch failed permanently for track {track_id} after {self.MAX_METADATA_RETRIES} attempts: {e}"
)
# If we reach here without returning, raise a generic metadata error
raise MetadataFetchError(f"Metadata fetch failed for track {track_id}")
async def download(self, track_id: int, quality: str = "LOSSLESS") -> bool | str:
"""Download track
Args: