diff --git a/endpoints/constructors.py b/endpoints/constructors.py index 2bc99a6..fa667ba 100644 --- a/endpoints/constructors.py +++ b/endpoints/constructors.py @@ -15,7 +15,10 @@ class LastFMException(Exception): class ValidArtistSearchRequest(BaseModel): """ - - **a**: artist name + Request model for searching an artist by name. + + Attributes: + - **a** (str): Artist name. """ a: str @@ -33,8 +36,11 @@ class ValidArtistSearchRequest(BaseModel): class ValidAlbumDetailRequest(BaseModel): """ - - **a**: artist name - - **release**: album/release name (as sourced from here/LastFM) + Request model for album details. + + Attributes: + - **a** (str): Artist name. + - **release** (str): Album/release name. """ a: str @@ -54,8 +60,11 @@ class ValidAlbumDetailRequest(BaseModel): class ValidTrackInfoRequest(BaseModel): """ - - **a**: artist name - - **t**: track + Request model for track info. + + Attributes: + - **a** (str): Artist name. + - **t** (str): Track name. """ a: str @@ -80,7 +89,10 @@ Rand Msg class RandMsgRequest(BaseModel): """ - - **short**: Short randmsg? + Request model for random message. + + Attributes: + - **short** (Optional[bool]): Short randmsg? """ short: Optional[bool] = False @@ -93,7 +105,10 @@ YT class ValidYTSearchRequest(BaseModel): """ - - **t**: title to search + Request model for YouTube search. + + Attributes: + - **t** (str): Title to search. """ t: str = "rick astley - never gonna give you up" @@ -106,7 +121,10 @@ Transcriptions class ValidShowEpisodeListRequest(BaseModel): """ - - **s**: show id + Request model for show episode list. + + Attributes: + - **s** (int): Show ID. """ s: int @@ -114,8 +132,11 @@ class ValidShowEpisodeListRequest(BaseModel): class ValidShowEpisodeLineRequest(BaseModel): """ - - **s**: show id - - **e**: episode id + Request model for show episode lines. + + Attributes: + - **s** (int): Show ID. + - **e** (int): Episode ID. """ s: int @@ -129,14 +150,17 @@ Lyric Search class ValidLyricRequest(BaseModel): """ - - **a**: artist - - **s**: song - - **t**: track (artist and song combined) [used only if a & s are not used] - - **extra**: include extra details in response [optional, default: false] - - **lrc**: Request LRCs? - - **sub**: text to search within lyrics, if found lyrics will begin at found verse [optional] - - **src**: the script/utility which initiated the request - - **excluded_sources**: sources to exclude (new only) + Request model for lyric search. + + Attributes: + - **a** (Optional[str]): Artist. + - **s** (Optional[str]): Song. + - **t** (Optional[str]): Track (artist and song combined). + - **extra** (Optional[bool]): Include extra details in response. + - **lrc** (Optional[bool]): Request LRCs? + - **sub** (Optional[str]): Text to search within lyrics. + - **src** (str): The script/utility which initiated the request. + - **excluded_sources** (Optional[list]): Sources to exclude. """ a: Optional[str] = None @@ -166,7 +190,10 @@ class ValidLyricRequest(BaseModel): class ValidTypeAheadRequest(BaseModel): """ - - **query**: query string + Request model for typeahead query. + + Attributes: + - **query** (str): Query string. """ query: str @@ -183,11 +210,15 @@ class RadioException(Exception): class ValidRadioSongRequest(BaseModel): """ - - **key**: API Key - - **artist**: artist to search - - **song**: song to search - - **artistsong**: may be used IN PLACE OF artist/song to perform a combined/string search in the format "artist - song" - - **alsoSkip**: Whether to skip immediately to this track [not implemented] + Request model for radio song request. + + Attributes: + - **key** (str): API Key. + - **artist** (Optional[str]): Artist to search. + - **song** (Optional[str]): Song to search. + - **artistsong** (Optional[str]): Combined artist-song string. + - **alsoSkip** (Optional[bool]): Whether to skip immediately to this track. + - **station** (Station): Station name. """ key: str @@ -200,7 +231,10 @@ class ValidRadioSongRequest(BaseModel): class ValidRadioTypeaheadRequest(BaseModel): """ - - **query**: Typeahead query + Request model for radio typeahead. + + Attributes: + - **query** (str): Typeahead query. """ query: str @@ -208,8 +242,11 @@ class ValidRadioTypeaheadRequest(BaseModel): class ValidRadioQueueGetRequest(BaseModel): """ - - **key**: API key (optional, needed if specifying a non-default limit) - - **limit**: optional, default: 15k + Request model for radio queue get. + + Attributes: + - **key** (Optional[str]): API key. + - **limit** (Optional[int]): Result limit. """ key: Optional[str] = None @@ -218,9 +255,12 @@ class ValidRadioQueueGetRequest(BaseModel): class ValidRadioNextRequest(BaseModel): """ - - **key**: API Key - - **skipTo**: UUID to skip to [optional] - - **station**: Station (default: "main") + Request model for radio next track. + + Attributes: + - **key** (str): API Key. + - **skipTo** (Optional[str]): UUID to skip to. + - **station** (Station): Station name. """ key: str @@ -230,17 +270,23 @@ class ValidRadioNextRequest(BaseModel): class ValidRadioReshuffleRequest(ValidRadioNextRequest): """ - - **key**: API Key - - **station**: Station (default: "main") + Request model for radio reshuffle. + + Attributes: + - **key** (str): API Key. + - **station** (Station): Station name. """ class ValidRadioQueueRequest(BaseModel): """ - - **draw**: DataTables draw count, default 1 - - **start**: paging start position, default 0 - - **search**: Optional search query - - **station**: Station (default: "main") + Request model for radio queue. + + Attributes: + - **draw** (Optional[int]): DataTables draw count. + - **start** (Optional[int]): Paging start position. + - **search** (Optional[str]): Search query. + - **station** (Station): Station name. """ draw: Optional[int] = 1 @@ -251,10 +297,13 @@ class ValidRadioQueueRequest(BaseModel): class ValidRadioQueueShiftRequest(BaseModel): """ - - **key**: API Key - - **uuid**: UUID to shift - - **next**: Play next if true, immediately if false, default False - - **station**: Station (default: "main") + Request model for radio queue shift. + + Attributes: + - **key** (str): API Key. + - **uuid** (str): UUID to shift. + - **next** (Optional[bool]): Play next if true. + - **station** (Station): Station name. """ key: str @@ -265,9 +314,12 @@ class ValidRadioQueueShiftRequest(BaseModel): class ValidRadioQueueRemovalRequest(BaseModel): """ - - **key**: API Key - - **uuid**: UUID to remove - - **station**: Station (default: "main") + Request model for radio queue removal. + + Attributes: + - **key** (str): API Key. + - **uuid** (str): UUID to remove. + - **station** (Station): Station name. """ key: str diff --git a/endpoints/lastfm.py b/endpoints/lastfm.py index b20e213..dd1a9c0 100644 --- a/endpoints/lastfm.py +++ b/endpoints/lastfm.py @@ -17,6 +17,7 @@ class LastFM(FastAPI): """Last.FM Endpoints""" def __init__(self, app: FastAPI, util, constants) -> None: + """Initialize LastFM endpoints.""" self.app: FastAPI = app self.util = util self.constants = constants @@ -44,8 +45,13 @@ class LastFM(FastAPI): self, data: ValidArtistSearchRequest ) -> JSONResponse: """ - Get artist info - - **a**: Artist to search + Get artist information by name. + + Parameters: + - **data** (ValidArtistSearchRequest): Request containing artist name. + + Returns: + - **JSONResponse**: Contains artist information or an error message. """ artist: Optional[str] = data.a.strip() if not artist: @@ -81,8 +87,13 @@ class LastFM(FastAPI): self, data: ValidArtistSearchRequest ) -> JSONResponse: """ - Get artist's albums/releases - - **a**: Artist to search + Get artist's albums/releases. + + Parameters: + - **data** (ValidArtistSearchRequest): Request containing artist name. + + Returns: + - **JSONResponse**: Contains a list of albums or an error message. """ artist: str = data.a.strip() if not artist: @@ -121,9 +132,13 @@ class LastFM(FastAPI): self, data: ValidAlbumDetailRequest ) -> JSONResponse: """ - Get details of a particular release by an artist - - **a**: Artist to search - - **release**: Release title to search + Get details of a particular release by an artist. + + Parameters: + - **data** (ValidAlbumDetailRequest): Request containing artist and release name. + + Returns: + - **JSONResponse**: Release details or error. """ artist: str = data.a.strip() release: str = data.release.strip() @@ -157,9 +172,13 @@ class LastFM(FastAPI): self, data: ValidAlbumDetailRequest ) -> JSONResponse: """ - Get track list for a particular release by an artist - - **a**: Artist to search - - **release**: Release title to search + Get track list for a particular release by an artist. + + Parameters: + - **data** (ValidAlbumDetailRequest): Request containing artist and release name. + + Returns: + - **JSONResponse**: Track list or error. """ artist: str = data.a.strip() release: str = data.release.strip() @@ -189,9 +208,13 @@ class LastFM(FastAPI): async def track_info_handler(self, data: ValidTrackInfoRequest) -> JSONResponse: """ - Get track info from Last.FM given an artist/track - - **a**: Artist to search - - **t**: Track title to search + Get track info from Last.FM given an artist/track. + + Parameters: + - **data** (ValidTrackInfoRequest): Request containing artist and track name. + + Returns: + - **JSONResponse**: Track info or error. """ try: artist: str = data.a diff --git a/endpoints/lyric_search.py b/endpoints/lyric_search.py index 1e50c3e..f123dd3 100644 --- a/endpoints/lyric_search.py +++ b/endpoints/lyric_search.py @@ -20,12 +20,21 @@ class CacheUtils: """ def __init__(self) -> None: + """Initialize CacheUtils.""" self.lyrics_db_path: LiteralString = os.path.join( "/usr/local/share", "sqlite_dbs", "cached_lyrics.db" ) async def check_typeahead(self, query: str) -> Optional[list[str]]: - """Lyric Search Typeahead DB Handler""" + """ + Check typeahead suggestions for lyric search. + + Args: + query: The search query. + + Returns: + List of matching artist-song strings, or None if query is empty. + """ if not query: return None async with sqlite3.connect(self.lyrics_db_path, timeout=1) as _db: @@ -46,6 +55,7 @@ class LyricSearch(FastAPI): """ def __init__(self, app: FastAPI, util, constants) -> None: + """Initialize LyricSearch endpoints.""" self.app: FastAPI = app self.util = util self.constants = constants @@ -92,8 +102,13 @@ class LyricSearch(FastAPI): async def typeahead_handler(self, data: ValidTypeAheadRequest) -> JSONResponse: """ - Lyric search typeahead handler - - **query**: Typeahead query + Handle lyric search typeahead requests. + + Parameters: + - **data** (ValidTypeAheadRequest): Request containing the query. + + Returns: + - **JSONResponse**: Typeahead suggestions or error. """ if not isinstance(data.query, str): return JSONResponse( @@ -112,15 +127,13 @@ class LyricSearch(FastAPI): async def lyric_search_handler(self, data: ValidLyricRequest) -> JSONResponse: """ - Search for lyrics - - **a**: artist - - **s**: song - - **t**: track (artist and song combined) [used only if a & s are not used] - - **extra**: include extra details in response [optional, default: false] - - **lrc**: Request LRCs? - - **sub**: text to search within lyrics, if found lyrics will begin at found verse [optional, default: none] - - **src**: the script/utility which initiated the request - - **excluded_sources**: sources to exclude [optional, default: none] + Search for lyrics. + + Parameters: + - **data** (ValidLyricRequest): Request containing artist, song, and other parameters. + + Returns: + - **JSONResponse**: Lyrics data or error. """ if (not data.a or not data.s) and not data.t or not data.src: raise HTTPException(detail="Invalid request", status_code=500) diff --git a/endpoints/meme.py b/endpoints/meme.py index 43b5d8c..ca6b97b 100644 --- a/endpoints/meme.py +++ b/endpoints/meme.py @@ -6,10 +6,11 @@ from utils.meme_util import MemeUtil class Meme(FastAPI): """ - Misc Endpoints + Meme Endpoints """ def __init__(self, app: FastAPI, my_util, constants) -> None: + """Initialize Meme endpoints.""" self.app: FastAPI = app self.util = my_util self.meme_util = MemeUtil(constants) @@ -35,21 +36,47 @@ class Meme(FastAPI): ) async def get_meme_by_id(self, id: int, request: Request) -> Response: - """Get meme (image) by id""" + """ + Get meme image by ID. + + Parameters: + - **id** (int): Meme ID. + - **request** (Request): The request object. + + Returns: + - **Response**: Image response or 404. + """ meme_image = await self.meme_util.get_meme_by_id(id) if not meme_image: return Response(status_code=404, content="Not found") return Response(content=meme_image, media_type="image/png") async def random_meme(self, request: Request) -> Response: - """Get random meme (image)""" + """ + Get a random meme image. + + Parameters: + - **request** (Request): The request object. + + Returns: + - **Response**: Image response or 404. + """ meme_image = await self.meme_util.get_random_meme() if not meme_image: return Response(status_code=404, content="Not found") return Response(content=meme_image, media_type="image/png") async def list_memes(self, page: int, request: Request) -> Response: - """Get meme (image) by id""" + """ + List memes with pagination. + + Parameters: + - **page** (int): Page number. + - **request** (Request): The request object. + + Returns: + - **JSONResponse**: List of memes with paging info. + """ meme_list = await self.meme_util.list_memes(page) page_count = await self.meme_util.get_page_count() return JSONResponse( diff --git a/endpoints/misc.py b/endpoints/misc.py index 98d8854..d759a85 100644 --- a/endpoints/misc.py +++ b/endpoints/misc.py @@ -24,6 +24,7 @@ class Misc(FastAPI): """ def __init__(self, app: FastAPI, my_util, constants, radio) -> None: + """Initialize Misc endpoints.""" self.app: FastAPI = app self.util = my_util self.constants = constants @@ -84,12 +85,29 @@ class Misc(FastAPI): return "No." async def no(self) -> JSONResponse: - """NaaS""" + """ + Get a random 'no' reason. + + Returns: + - **JSONResponse**: Contains a random 'no' reason. + """ + return JSONResponse(content={"no": self.get_no()}) async def upload_activity_image( self, image: UploadFile, key: Annotated[str, Form()], request: Request ) -> Response: + """ + Upload activity image. + + Parameters: + - **image** (UploadFile): The uploaded image file. + - **key** (str): The API key for authentication. + - **request** (Request): The HTTP request object. + + Returns: + - **Response**: Indicates success or failure of the upload. + """ if ( not key or not isinstance(key, str) @@ -112,6 +130,15 @@ class Misc(FastAPI): ) async def get_activity_image(self, request: Request) -> Response: + """ + Get the activity image. + + Parameters: + - **request** (Request): The HTTP request object. + + Returns: + - **Response**: The activity image or a fallback image. + """ if isinstance(self.activity_image, bytes): return Response(content=self.activity_image, media_type="image/png") @@ -125,11 +152,13 @@ class Misc(FastAPI): async def get_radio_np(self, station: str = "main") -> tuple[str, str, str]: """ - Get radio now playing + Get radio now playing info. + Args: - None + station: Station name. + Returns: - str: Radio now playing in artist - song format + Tuple of (artistsong, album, genre). """ np: dict = self.radio.radio_util.now_playing[station] @@ -145,7 +174,10 @@ class Misc(FastAPI): async def homepage_redis_widget(self) -> JSONResponse: """ - Homepage Redis Widget Handler + Get Redis stats for homepage widget. + + Returns: + - **JSONResponse**: Contains Redis stats. """ # Measure response time w/ test lyric search time_start: float = time.time() # Start time for response_time @@ -169,7 +201,10 @@ class Misc(FastAPI): async def homepage_rq_widget(self) -> JSONResponse: """ - Homepage RQ Widget Handler + Get RQ job stats for homepage widget. + + Returns: + - **JSONResponse**: Contains RQ job stats. """ queue_name = "dls" queue = Queue(queue_name, self.redis_client) @@ -193,7 +228,10 @@ class Misc(FastAPI): async def homepage_sqlite_widget(self) -> JSONResponse: """ - Homepage SQLite Widget Handler + Get SQLite stats for homepage widget. + + Returns: + - **JSONResponse**: Contains SQLite stats. """ row_count: int = await self.lyr_cache.sqlite_rowcount() distinct_artists: int = await self.lyr_cache.sqlite_distinct("artist") @@ -208,7 +246,10 @@ class Misc(FastAPI): async def homepage_lyrics_widget(self) -> JSONResponse: """ - Homepage Lyrics Widget Handler + Get lyrics stats for homepage widget. + + Returns: + - **JSONResponse**: Contains lyrics stats. """ found_counts: Optional[dict] = await self.redis_cache.get_found_counts() if not isinstance(found_counts, dict): @@ -228,7 +269,13 @@ class Misc(FastAPI): async def homepage_radio_widget(self, station: str = "main") -> JSONResponse: """ - Homepage Radio Widget Handler + Get radio now playing for homepage widget. + + Parameters: + - **station** (str): Station name. Defaults to "main". + + Returns: + - **JSONResponse**: Contains now playing info. """ radio_np: tuple = await self.get_radio_np(station) if not radio_np: diff --git a/endpoints/radio.py b/endpoints/radio.py index 0a432af..a5c696c 100644 --- a/endpoints/radio.py +++ b/endpoints/radio.py @@ -68,10 +68,14 @@ class Radio(FastAPI): self, data: ValidRadioNextRequest, request: Request ) -> JSONResponse: """ - Skip to the next track in the queue, or to uuid specified in skipTo if provided - - **key**: API key - - **skipTo**: Optional UUID to skip to - - **station**: default "main" + Skip to the next track in the queue, or to the UUID specified in `skipTo` if provided. + + Parameters: + - **data** (ValidRadioNextRequest): Contains the API key, optional UUID to skip to, and station name. + - **request** (Request): The HTTP request object. + + Returns: + - **JSONResponse**: Indicates success or failure of the skip operation. """ try: if not self.util.check_key(path=request.url.path, req_type=4, key=data.key): @@ -114,10 +118,16 @@ class Radio(FastAPI): self, data: ValidRadioReshuffleRequest, request: Request ) -> JSONResponse: """ - Reshuffle the play queue - - **key**: API key - - **station**: default "main" + Reshuffle the play queue. + + Parameters: + - **data** (ValidRadioReshuffleRequest): Contains the API key and station name. + - **request** (Request): The HTTP request object. + + Returns: + - **JSONResponse**: Indicates success of the reshuffle operation. """ + if not self.util.check_key(path=request.url.path, req_type=4, key=data.key): raise HTTPException(status_code=403, detail="Unauthorized") @@ -130,7 +140,14 @@ class Radio(FastAPI): data: Optional[ValidRadioQueueRequest] = None, ) -> JSONResponse: """ - Get current play queue (paged, 20 results per page) + Get the current play queue (paged, 20 results per page). + + Parameters: + - **request** (Request): The HTTP request object. + - **data** (Optional[ValidRadioQueueRequest]): Contains the station name and optional search query. + + Returns: + - **JSONResponse**: Contains the paged queue data. """ if not (data and data.station): return JSONResponse(status_code=500, @@ -191,13 +208,16 @@ class Radio(FastAPI): self, data: ValidRadioQueueShiftRequest, request: Request ) -> JSONResponse: """ - Shift position of a UUID within the queue - [currently limited to playing next or immediately] - - **key**: API key - - **uuid**: UUID to shift - - **next**: Play track next? If False, skips to the track - - **station**: default "main" + Shift the position of a UUID within the queue. + + Parameters: + - **data** (ValidRadioQueueShiftRequest): Contains the API key, UUID to shift, and station name. + - **request** (Request): The HTTP request object. + + Returns: + - **JSONResponse**: Indicates success of the shift operation. """ + if not self.util.check_key(path=request.url.path, req_type=4, key=data.key): raise HTTPException(status_code=403, detail="Unauthorized") @@ -225,11 +245,16 @@ class Radio(FastAPI): self, data: ValidRadioQueueRemovalRequest, request: Request ) -> JSONResponse: """ - Remove an item from the current play queue - - **key**: API key - - **uuid**: UUID of queue item to remove - - **station**: default "main" + Remove an item from the current play queue. + + Parameters: + - **data** (ValidRadioQueueRemovalRequest): Contains the API key, UUID of the item to remove, and station name. + - **request** (Request): The HTTP request object. + + Returns: + - **JSONResponse**: Indicates success of the removal operation. """ + if not self.util.check_key(path=request.url.path, req_type=4, key=data.key): raise HTTPException(status_code=403, detail="Unauthorized") @@ -254,10 +279,15 @@ class Radio(FastAPI): station: Station = "main" ) -> Response: """ - Get album art, optional parameter track_id may be specified. - Otherwise, current track album art will be pulled. - - **track_id**: Optional, if provided, will attempt to retrieve the album art of this track_id. Current track used otherwise. - - **station**: default "main" + Get album art for the current or specified track. + + Parameters: + - **request** (Request): The HTTP request object. + - **track_id** (Optional[int]): ID of the track to retrieve album art for. Defaults to the current track. + - **station** (Station): Name of the station. Defaults to "main". + + Returns: + - **Response**: Contains the album art image or a default image. """ try: if not track_id: @@ -288,9 +318,16 @@ class Radio(FastAPI): async def radio_now_playing(self, request: Request, station: Station = "main") -> JSONResponse: """ - Get currently playing track info - - **station**: default "main" + Get information about the currently playing track. + + Parameters: + - **request** (Request): The HTTP request object. + - **station** (Station): Name of the station. Defaults to "main". + + Returns: + - **JSONResponse**: Contains the track information. """ + ret_obj: dict = {**self.radio_util.now_playing[station]} ret_obj["station"] = station try: @@ -308,12 +345,17 @@ class Radio(FastAPI): background_tasks: BackgroundTasks, ) -> JSONResponse: """ - Get next track - (Track will be removed from the queue in the process.) - - **key**: API key - - **skipTo**: Optional UUID to skip to - - **station**: default: "main" + Get the next track in the queue. The track will be removed from the queue in the process. + + Parameters: + - **data** (ValidRadioNextRequest): Contains the API key, optional UUID to skip to, and station name. + - **request** (Request): The HTTP request object. + - **background_tasks** (BackgroundTasks): Background tasks for webhook execution. + + Returns: + - **JSONResponse**: Contains the next track information. """ + logging.info("Radio get next") if data.station not in self.radio_util.active_playlist.keys(): raise HTTPException(status_code=500, detail="No such station/not ready") @@ -372,14 +414,16 @@ class Radio(FastAPI): self, data: ValidRadioSongRequest, request: Request ) -> JSONResponse: """ - Song request handler - - **key**: API key - - **artist**: Artist to search - - **song**: Song to search - - **artistsong**: Optional "Artist - Song" pair to search, in place of artist/song - - **alsoSkip**: If True, skips to the track; otherwise, track will be placed next up in queue - - **station**: default "main" + Handle song requests. + + Parameters: + - **data** (ValidRadioSongRequest): Contains the API key, artist, song, and station name. + - **request** (Request): The HTTP request object. + + Returns: + - **JSONResponse**: Indicates success or failure of the request. """ + if not self.util.check_key(path=request.url.path, req_type=4, key=data.key): raise HTTPException(status_code=403, detail="Unauthorized") artistsong: Optional[str] = data.artistsong @@ -413,8 +457,14 @@ class Radio(FastAPI): self, data: ValidRadioTypeaheadRequest, request: Request ) -> JSONResponse: """ - Radio typeahead handler - - **query**: Typeahead query + Handle typeahead queries for the radio. + + Parameters: + - **data** (ValidRadioTypeaheadRequest): Contains the typeahead query. + - **request** (Request): The HTTP request object. + + Returns: + - **JSONResponse**: Contains the typeahead results. """ if not isinstance(data.query, str): return JSONResponse( diff --git a/endpoints/rand_msg.py b/endpoints/rand_msg.py index 4be299a..b1d19c3 100644 --- a/endpoints/rand_msg.py +++ b/endpoints/rand_msg.py @@ -14,6 +14,7 @@ class RandMsg(FastAPI): """ def __init__(self, app: FastAPI, util, constants) -> None: + """Initialize RandMsg endpoint.""" self.app: FastAPI = app self.util = util self.constants = constants @@ -30,8 +31,13 @@ class RandMsg(FastAPI): self, data: Optional[RandMsgRequest] = None ) -> JSONResponse: """ - Get a randomly generated message - - **short**: Optional, if True, will limit length of returned random messages to <=126 characters (Discord restriction related) + Get a randomly generated message. + + Parameters: + - **data** (Optional[RandMsgRequest]): Optional request data for short messages. + + Returns: + - **JSONResponse**: Contains a random message. """ random.seed() short: Optional[bool] = False diff --git a/endpoints/rip.py b/endpoints/rip.py index 5360c07..5a82949 100644 --- a/endpoints/rip.py +++ b/endpoints/rip.py @@ -33,6 +33,7 @@ class RIP(FastAPI): """ def __init__(self, app: FastAPI, my_util, constants) -> None: + """Initialize RIP endpoints.""" self.app: FastAPI = app self.util = my_util self.trip_util = SRUtil() @@ -72,7 +73,15 @@ class RIP(FastAPI): ) def _format_job(self, job: Job): - """Helper to normalize job data into JSON.""" + """ + Helper to normalize job data into JSON. + + Parameters: + - job (Job): The job object to format. + + Returns: + - dict: Contains normalized job data. + """ job_status: str | JobStatus = job.get_status() progress = job.meta.get("progress", 0) if progress == 100 and not job.meta.get("tarball"): @@ -82,7 +91,13 @@ class RIP(FastAPI): tracks_out = len(job.meta.get("tracks", [])) # `utils/rip_background.py` sets per-track status to 'Success' or 'Failed' # so check for 'success' case-insensitively and count matches. - succeeded_tracks = len([t for t in job.meta.get("tracks", []) if str(t.get("status", "")).lower() == "success"]) + succeeded_tracks = len( + [ + t + for t in job.meta.get("tracks", []) + if str(t.get("status", "")).lower() == "success" + ] + ) return { "id": job.id, @@ -103,10 +118,20 @@ class RIP(FastAPI): async def artists_by_name_handler( self, artist: str, request: Request, user=Depends(get_current_user) ) -> Response: - """Get artists by name""" + """ + Get artists by name. + + Parameters: + - **artist** (str): Artist name. + - **request** (Request): The request object. + - **user**: Current user (dependency). + + Returns: + - **Response**: JSON response with artists or 404. + """ # support optional grouping to return one primary per display name # with `alternatives` for disambiguation (use ?group=true) - group = bool(request.query_params.get("group", False)) + group = bool(request.query_params.get("group", False)) artists = await self.trip_util.get_artists_by_name(artist, group=group) if not artists: return Response(status_code=404, content="Not found") @@ -115,7 +140,17 @@ class RIP(FastAPI): async def albums_by_artist_id_handler( self, artist_id: int, request: Request, user=Depends(get_current_user) ) -> Response: - """Get albums by artist ID""" + """ + Get albums by artist ID. + + Parameters: + - **artist_id** (int): Artist ID. + - **request** (Request): The request object. + - **user**: Current user (dependency). + + Returns: + - **Response**: JSON response with albums or 404. + """ albums = await self.trip_util.get_albums_by_artist_id(artist_id) if not albums: return Response(status_code=404, content="Not found") @@ -128,7 +163,18 @@ class RIP(FastAPI): user=Depends(get_current_user), quality: Literal["FLAC", "Lossy"] = "FLAC", ) -> Response: - """Get tracks by album id""" + """ + Get tracks by album ID. + + Parameters: + - **album_id** (int): Album ID. + - **request** (Request): The request object. + - **quality** (Literal): Track quality ("FLAC" or "Lossy"). + - **user**: Current user (dependency). + + Returns: + - **Response**: JSON response with tracks or 404. + """ tracks = await self.trip_util.get_tracks_by_album_id(album_id, quality) if not tracks: return Response(status_code=404, content="Not Found") @@ -137,7 +183,18 @@ class RIP(FastAPI): async def tracks_by_artist_song_handler( self, artist: str, song: str, request: Request, user=Depends(get_current_user) ) -> Response: - """Get tracks by artist and song name""" + """ + Get tracks by artist and song name. + + Parameters: + - **artist** (str): Artist name. + - **song** (str): Song name. + - **request** (Request): The request object. + - **user**: Current user (dependency). + + Returns: + - **Response**: JSON response with tracks or 404. + """ logging.critical("Searching for tracks by artist: %s, song: %s", artist, song) tracks = await self.trip_util.get_tracks_by_artist_song(artist, song) if not tracks: @@ -151,7 +208,18 @@ class RIP(FastAPI): quality: Literal["FLAC", "Lossy"] = "FLAC", user=Depends(get_current_user), ) -> Response: - """Get track by ID""" + """ + Get track by ID. + + Parameters: + - **track_id** (int): Track ID. + - **request** (Request): The request object. + - **quality** (Literal): Track quality ("FLAC" or "Lossy"). + - **user**: Current user (dependency). + + Returns: + - **Response**: JSON response with stream URL or 404. + """ track = await self.trip_util.get_stream_url_by_track_id(track_id, quality) if not track: return Response(status_code=404, content="Not found") @@ -163,7 +231,17 @@ class RIP(FastAPI): request: Request, user=Depends(get_current_user), ) -> Response: - """Bulk fetch a list of track IDs""" + """ + Bulk fetch a list of track IDs. + + Parameters: + - **data** (ValidBulkFetchRequest): Bulk fetch request data. + - **request** (Request): The request object. + - **user**: Current user (dependency). + + Returns: + - **Response**: JSON response with job info or error. + """ if not data or not data.track_ids or not data.target: return JSONResponse( content={ @@ -204,7 +282,17 @@ class RIP(FastAPI): async def job_status_handler( self, job_id: str, request: Request, user=Depends(get_current_user) ): - """Get status and result of a single job""" + """ + Get status and result of a single job. + + Parameters: + - **job_id** (str): Job ID. + - **request** (Request): The request object. + - **user**: Current user (dependency). + + Returns: + - **JSONResponse**: Job status and result or error. + """ job = None try: @@ -233,7 +321,16 @@ class RIP(FastAPI): return self._format_job(job) async def job_list_handler(self, request: Request, user=Depends(get_current_user)): - """List all jobs across all registries (queued, started, finished, failed, etc).""" + """ + List all jobs across all registries. + + Parameters: + - **request** (Request): The request object. + - **user**: Current user (dependency). + + Returns: + - **JSONResponse**: List of jobs. + """ jobs_info = [] seen = set() diff --git a/endpoints/transcriptions.py b/endpoints/transcriptions.py index a9ff6c5..37d7874 100644 --- a/endpoints/transcriptions.py +++ b/endpoints/transcriptions.py @@ -13,6 +13,7 @@ class Transcriptions(FastAPI): """ def __init__(self, app: FastAPI, util, constants) -> None: + """Initialize Transcriptions endpoints.""" self.app: FastAPI = app self.util = util self.constants = constants @@ -36,8 +37,13 @@ class Transcriptions(FastAPI): self, data: ValidShowEpisodeListRequest ) -> JSONResponse: """ - Get list of episodes by show id - - **s**: Show ID to query + Get list of episodes by show ID. + + Parameters: + - **data** (ValidShowEpisodeListRequest): Request containing show ID. + + Returns: + - **JSONResponse**: Contains a list of episodes. """ show_id: int = data.s db_path: Optional[Union[str, LiteralString]] = None @@ -106,9 +112,13 @@ class Transcriptions(FastAPI): self, data: ValidShowEpisodeLineRequest ) -> Response: """ - Get lines for a particular episode - - **s**: Show ID to query - - **e**: Episode ID to query + Get lines for a particular episode. + + Parameters: + - **data** (ValidShowEpisodeLineRequest): Request containing show and episode ID. + + Returns: + - **Response**: Episode lines. """ show_id: int = int(data.s) episode_id: int = int(data.e) diff --git a/endpoints/yt.py b/endpoints/yt.py index dd86b7c..d98489b 100644 --- a/endpoints/yt.py +++ b/endpoints/yt.py @@ -12,6 +12,7 @@ class YT(FastAPI): """ def __init__(self, app: FastAPI, util, constants) -> None: + """Initialize YT endpoints.""" self.app: FastAPI = app self.util = util self.constants = constants @@ -31,7 +32,15 @@ class YT(FastAPI): ) async def yt_video_search_handler(self, data: ValidYTSearchRequest) -> JSONResponse: - """Search for YT Video by Title (closest match returned)""" + """ + Search for YouTube video by title. + + Parameters: + - **data** (ValidYTSearchRequest): Request containing title. + + Returns: + - **JSONResponse**: Contains video ID or an error message. + """ title: str = data.t yts_res: Optional[list[dict]] = await self.ytsearch.search(title) diff --git a/test/add_cover_art.py b/test/add_cover_art.py index ad6d1bb..5eaeacf 100644 --- a/test/add_cover_art.py +++ b/test/add_cover_art.py @@ -15,19 +15,28 @@ from rapidfuzz import fuzz from music_tag import load_file # type: ignore from rich.console import Console from rich.table import Table -from rich.progress import Progress, BarColumn, TextColumn, TimeElapsedColumn, TaskProgressColumn +from rich.progress import ( + Progress, + BarColumn, + TextColumn, + TimeElapsedColumn, + TaskProgressColumn, +) # Local imports sys.path.insert(0, "..") from utils.sr_wrapper import SRUtil -import musicbrainzngs # type: ignore -from discogs_client import Client # type: ignore +import musicbrainzngs # type: ignore +from discogs_client import Client # type: ignore + # typing helper from typing import Any, cast, Optional + # Optional: use the popular `itunespy` PyPI package when available try: import itunespy # type: ignore + HAVE_ITUNESPY = True except Exception: itunespy = None @@ -36,6 +45,7 @@ except Exception: # Optional: use `spotipy` when available for Spotify lookups try: import spotipy # type: ignore + HAVE_SPOTIPY = True except Exception: spotipy = None @@ -45,52 +55,70 @@ except Exception: # Configurable paths and extensions MUSIC_DIR = Path("/storage/music2/completed/FLAC/review") -AUDIO_EXTS = {'.flac', '.mp3', '.m4a', '.ogg', '.wav', '.aac'} +AUDIO_EXTS = {".flac", ".mp3", ".m4a", ".ogg", ".wav", ".aac"} REPORT_CSV = "cover_art_report.csv" ALBUM_ART_CACHE: dict = {} + + # Reminder: If you see 'Import "music_tag" could not be resolved', run: # uv add music-tag -async def search_musicbrainz_cover(artist, album, session: aiohttp.ClientSession, limiter: 'AsyncRateLimiter'): +async def search_musicbrainz_cover( + artist, album, session: aiohttp.ClientSession, limiter: "AsyncRateLimiter" +): # Use musicbrainzngs to search for a release-group matching artist+album try: # search for release-groups using a thread to avoid blocking query = f"artist:{artist} AND release:{album}" try: - res = await asyncio.to_thread(musicbrainzngs.search_release_groups, query, 5) + res = await asyncio.to_thread( + musicbrainzngs.search_release_groups, query, 5 + ) except Exception: res = {} if COVER_DEBUG_QUERIES: try: - rgs_dbg = res.get('release-group-list') or [] + rgs_dbg = res.get("release-group-list") or [] dbg_info = [] for rg in rgs_dbg[:3]: - dbg_info.append({ - 'id': rg.get('id'), - 'title': rg.get('title'), - 'artist': artist_credit_to_name(rg.get('artist-credit', [])) - }) - console.print(f"[cyan][DEBUG] MusicBrainz candidates: {dbg_info}[/cyan]") + dbg_info.append( + { + "id": rg.get("id"), + "title": rg.get("title"), + "artist": artist_credit_to_name( + rg.get("artist-credit", []) + ), + } + ) + console.print( + f"[cyan][DEBUG] MusicBrainz candidates: {dbg_info}[/cyan]" + ) except Exception: pass - rgs = res.get('release-group-list') or [] + rgs = res.get("release-group-list") or [] if COVER_DEBUG_QUERIES: try: dbg_info = [] for rg in (rgs or [])[:3]: - dbg_info.append({ - 'id': rg.get('id'), - 'title': rg.get('title'), - 'artist': artist_credit_to_name(rg.get('artist-credit', [])) - }) - console.print(f"[cyan][DEBUG] MusicBrainz top candidates: {dbg_info}[/cyan]") + dbg_info.append( + { + "id": rg.get("id"), + "title": rg.get("title"), + "artist": artist_credit_to_name( + rg.get("artist-credit", []) + ), + } + ) + console.print( + f"[cyan][DEBUG] MusicBrainz top candidates: {dbg_info}[/cyan]" + ) except Exception: pass for rg in rgs: # try to get cover art via Cover Art Archive for releases in the group # check releases for a cover - releases = rg.get('release-list') or [] + releases = rg.get("release-list") or [] for rel in releases: - relid = rel.get('id') + relid = rel.get("id") if relid: caa_url = f"https://coverartarchive.org/release/{relid}/front-500" try: @@ -106,7 +134,10 @@ async def search_musicbrainz_cover(artist, album, session: aiohttp.ClientSession console.print(f"[red]MusicBrainz search exception: {e}[/red]") return None -async def search_discogs_cover(artist, album, session: aiohttp.ClientSession, limiter: 'AsyncRateLimiter'): + +async def search_discogs_cover( + artist, album, session: aiohttp.ClientSession, limiter: "AsyncRateLimiter" +): # Use discogs_client to search for releases matching artist+album try: if not DISCOGS_TOKEN: @@ -115,8 +146,12 @@ async def search_discogs_cover(artist, album, session: aiohttp.ClientSession, li try: await limiter.acquire() if COVER_DEBUG_QUERIES: - console.print(f"[cyan][DEBUG] Discogs query: album='{album}' artist='{artist}'") - results = await asyncio.to_thread(discogs_client.search, album, {'artist': artist, 'type': 'release'}) + console.print( + f"[cyan][DEBUG] Discogs query: album='{album}' artist='{artist}'" + ) + results = await asyncio.to_thread( + discogs_client.search, album, {"artist": artist, "type": "release"} + ) except Exception: results = [] if COVER_DEBUG_QUERIES: @@ -124,12 +159,15 @@ async def search_discogs_cover(artist, album, session: aiohttp.ClientSession, li dbg = [] for rr in (results or [])[:3]: try: - data = getattr(rr, 'data', {}) or {} - dbg.append({ - 'id': data.get('id'), - 'title': data.get('title') or getattr(rr, 'title', None), - 'cover_image': data.get('cover_image') - }) + data = getattr(rr, "data", {}) or {} + dbg.append( + { + "id": data.get("id"), + "title": data.get("title") + or getattr(rr, "title", None), + "cover_image": data.get("cover_image"), + } + ) except Exception: continue console.print(f"[cyan][DEBUG] Discogs candidates: {dbg}[/cyan]") @@ -142,21 +180,23 @@ async def search_discogs_cover(artist, album, session: aiohttp.ClientSession, li combined = f"{normalize_name(artist)} {normalize_name(album)}" if COVER_DEBUG_QUERIES: console.print(f"[cyan][DEBUG] Discogs fallback query: {combined}") - results = await asyncio.to_thread(discogs_client.search, combined, {'type': 'release'}) + results = await asyncio.to_thread( + discogs_client.search, combined, {"type": "release"} + ) except Exception: results = [] for r in results: # r.data may contain 'cover_image' or images cover = None try: - cover = r.data.get('cover_image') + cover = r.data.get("cover_image") except Exception: cover = None if not cover: # try images list - imgs = r.data.get('images') or [] - if imgs and isinstance(imgs, list) and imgs[0].get('uri'): - cover = imgs[0].get('uri') + imgs = r.data.get("images") or [] + if imgs and isinstance(imgs, list) and imgs[0].get("uri"): + cover = imgs[0].get("uri") if cover: # fetch image via aiohttp try: @@ -172,6 +212,7 @@ async def search_discogs_cover(artist, album, session: aiohttp.ClientSession, li console.print(f"[red]Discogs search exception: {e}[/red]") return None + # Load env once load_dotenv() @@ -179,11 +220,15 @@ load_dotenv() console = Console() # If set to '1'|'true', run only Spotify searches (useful for quick testing) -ONLY_SPOTIFY = os.getenv('ONLY_SPOTIFY', '').lower() in ('1', 'true', 'yes') +ONLY_SPOTIFY = os.getenv("ONLY_SPOTIFY", "").lower() in ("1", "true", "yes") # If set, print query strings and brief response info for debugging -COVER_DEBUG_QUERIES = os.getenv('COVER_DEBUG_QUERIES', '').lower() in ('1', 'true', 'yes') +COVER_DEBUG_QUERIES = os.getenv("COVER_DEBUG_QUERIES", "").lower() in ( + "1", + "true", + "yes", +) # If set, use more aggressive fuzzy thresholds and extra fallbacks -COVER_AGGRESSIVE = os.getenv('COVER_AGGRESSIVE', '').lower() in ('1', 'true', 'yes') +COVER_AGGRESSIVE = os.getenv("COVER_AGGRESSIVE", "").lower() in ("1", "true", "yes") def _log_attempt(artist, album, title, source, result): @@ -222,6 +267,7 @@ class AsyncRateLimiter: await asyncio.sleep(wait) self._last = asyncio.get_event_loop().time() + # Initialize MusicBrainz client musicbrainzngs.set_useragent("cover-art-script", "1.0", "your-email@example.com") @@ -229,6 +275,7 @@ musicbrainzngs.set_useragent("cover-art-script", "1.0", "your-email@example.com" DISCOGS_TOKEN = os.getenv("DISCOGS_TOKEN") discogs_client = Client("cover-art-script/1.0", user_token=DISCOGS_TOKEN) + # Define the log_api_response function at the top of the script async def log_api_response(api_name, response): """Log relevant parts of API responses for debugging purposes.""" @@ -240,55 +287,69 @@ async def log_api_response(api_name, response): { "id": rg.get("id"), "title": rg.get("title"), - "artist": artist_credit_to_name(rg.get("artist-credit", [])) + "artist": artist_credit_to_name(rg.get("artist-credit", [])), } for rg in release_groups ] - console.print(f"[cyan][DEBUG] {api_name} relevant response: {relevant_info}[/cyan]") + console.print( + f"[cyan][DEBUG] {api_name} relevant response: {relevant_info}[/cyan]" + ) elif api_name == "Discogs": results = data.get("results", []) relevant_info = [ { "id": result.get("id"), "title": result.get("title"), - "cover_image": result.get("cover_image") + "cover_image": result.get("cover_image"), } for result in results ] - console.print(f"[cyan][DEBUG] {api_name} relevant response: {relevant_info}[/cyan]") + console.print( + f"[cyan][DEBUG] {api_name} relevant response: {relevant_info}[/cyan]" + ) elif api_name == "iTunes": results = data.get("results", []) relevant_info = [ { "collectionId": result.get("collectionId"), "collectionName": result.get("collectionName"), - "artworkUrl100": result.get("artworkUrl100") + "artworkUrl100": result.get("artworkUrl100"), } for result in results ] - console.print(f"[cyan][DEBUG] {api_name} relevant response: {relevant_info}[/cyan]") + console.print( + f"[cyan][DEBUG] {api_name} relevant response: {relevant_info}[/cyan]" + ) else: console.print(f"[cyan][DEBUG] {api_name} response: {data}[/cyan]") except Exception as e: console.print(f"[red][DEBUG] Failed to parse {api_name} response: {e}[/red]") + # Helper to strip common parenthetical tags from album names def strip_album_tags(album): """Remove common parenthetical tags from the end of album names.""" pattern = r"\s*\((deluxe|remaster(ed)?|original mix|expanded|bonus|edition|version|mono|stereo|explicit|clean|anniversary|special|reissue|expanded edition|bonus track(s)?|international|digital|single|ep|live|instrumental|karaoke|radio edit|explicit version|clean version|acoustic|demo|re-recorded|remix|mix|edit|feat\.?|featuring|with .+|from .+|soundtrack|ost|score|session|vol(ume)? ?\d+|disc ?\d+|cd ?\d+|lp ?\d+|vinyl|202[0-9]|20[0-1][0-9]|19[0-9][0-9])\)$" return re.sub(pattern, "", album, flags=re.IGNORECASE).strip() + # Helper to strip common trailing tags like EP, LP, Single, Album, etc. from album names def strip_album_suffix(album): # Remove trailing tags like ' EP', ' LP', ' Single', ' Album', ' Remix', ' Version', etc. # Only if they appear at the end, case-insensitive, with or without punctuation suffix_pattern = r"[\s\-_:]*(ep|lp|single|album|remix|version|edit|mix|deluxe|expanded|anniversary|reissue|instrumental|karaoke|ost|score|session|mono|stereo|explicit|clean|bonus|disc ?\d+|cd ?\d+|vinyl|202[0-9]|20[0-1][0-9]|19[0-9][0-9])$" return re.sub(suffix_pattern, "", album, flags=re.IGNORECASE).strip() + + # iTunes/Apple Music API fallback (async) -async def search_itunes_cover(session: aiohttp.ClientSession, artist, album, limiter: 'AsyncRateLimiter'): +async def search_itunes_cover( + session: aiohttp.ClientSession, artist, album, limiter: "AsyncRateLimiter" +): # Use only the `itunespy` library for iTunes album lookups. if not HAVE_ITUNESPY: - console.print(f"[yellow]iTunes: itunespy not available; skipping iTunes album search for '{artist} - {album}'[/yellow]") + console.print( + f"[yellow]iTunes: itunespy not available; skipping iTunes album search for '{artist} - {album}'[/yellow]" + ) return None try: @@ -297,12 +358,12 @@ async def search_itunes_cover(session: aiohttp.ClientSession, artist, album, lim def _search(): try: # try common itunespy APIs safely - if hasattr(mod, 'search_album'): + if hasattr(mod, "search_album"): return mod.search_album(f"{artist} {album}") - if hasattr(mod, 'Album') and hasattr(mod.Album, 'search'): + if hasattr(mod, "Album") and hasattr(mod.Album, "search"): return mod.Album.search(f"{artist} {album}") - if hasattr(mod, 'search'): - return mod.search(f"{artist} {album}", entity='album') + if hasattr(mod, "search"): + return mod.search(f"{artist} {album}", entity="album") return None except Exception: return None @@ -313,29 +374,36 @@ async def search_itunes_cover(session: aiohttp.ClientSession, artist, album, lim dbg = [] for a in (albums or [])[:3]: try: - aid = getattr(a, 'collectionId', None) or (a.get('collectionId') if isinstance(a, dict) else None) + aid = getattr(a, "collectionId", None) or ( + a.get("collectionId") if isinstance(a, dict) else None + ) except Exception: aid = None try: - aname = getattr(a, 'collectionName', None) or (a.get('collectionName') if isinstance(a, dict) else None) + aname = getattr(a, "collectionName", None) or ( + a.get("collectionName") if isinstance(a, dict) else None + ) except Exception: aname = None - dbg.append({'id': aid, 'name': aname}) + dbg.append({"id": aid, "name": aname}) console.print(f"[cyan][DEBUG] iTunes album candidates: {dbg}[/cyan]") except Exception: pass if not albums: if COVER_DEBUG_QUERIES: - console.print(f"[cyan][DEBUG] iTunes album: no results for '{artist} - {album}', trying normalized fallback") + console.print( + f"[cyan][DEBUG] iTunes album: no results for '{artist} - {album}', trying normalized fallback" + ) norm_q = f"{normalize_name(artist)} {normalize_name(album)}" + def _search_norm(): try: - if hasattr(mod, 'search_album'): + if hasattr(mod, "search_album"): return mod.search_album(norm_q) - if hasattr(mod, 'Album') and hasattr(mod.Album, 'search'): + if hasattr(mod, "Album") and hasattr(mod.Album, "search"): return mod.Album.search(norm_q) - if hasattr(mod, 'search'): - return mod.search(norm_q, entity='album') + if hasattr(mod, "search"): + return mod.search(norm_q, entity="album") return None except Exception: return None @@ -345,13 +413,15 @@ async def search_itunes_cover(session: aiohttp.ClientSession, artist, album, lim return None first = albums[0] - art_url = getattr(first, 'artwork_url', None) or getattr(first, 'artworkUrl100', None) + art_url = getattr(first, "artwork_url", None) or getattr( + first, "artworkUrl100", None + ) if not art_url: return None # Normalize to higher-res if possible - if '100x100' in art_url: - art_url = art_url.replace('100x100bb', '600x600bb') + if "100x100" in art_url: + art_url = art_url.replace("100x100bb", "600x600bb") await limiter.acquire() img_timeout = aiohttp.ClientTimeout(total=15) @@ -366,10 +436,14 @@ async def search_itunes_cover(session: aiohttp.ClientSession, artist, album, lim return None -async def search_itunes_track(session: aiohttp.ClientSession, artist, title, limiter: 'AsyncRateLimiter'): +async def search_itunes_track( + session: aiohttp.ClientSession, artist, title, limiter: "AsyncRateLimiter" +): # Use only the `itunespy` library for iTunes track lookups. if not HAVE_ITUNESPY: - console.print(f"[yellow]iTunes: itunespy not available; skipping iTunes track search for '{artist} - {title}'[/yellow]") + console.print( + f"[yellow]iTunes: itunespy not available; skipping iTunes track search for '{artist} - {title}'[/yellow]" + ) return None try: @@ -377,12 +451,12 @@ async def search_itunes_track(session: aiohttp.ClientSession, artist, title, lim def _search(): try: - if hasattr(mod, 'search_track'): + if hasattr(mod, "search_track"): return mod.search_track(f"{artist} {title}") - if hasattr(mod, 'Track') and hasattr(mod.Track, 'search'): + if hasattr(mod, "Track") and hasattr(mod.Track, "search"): return mod.Track.search(f"{artist} {title}") - if hasattr(mod, 'search'): - return mod.search(f"{artist} {title}", entity='song') + if hasattr(mod, "search"): + return mod.search(f"{artist} {title}", entity="song") return None except Exception: return None @@ -390,16 +464,19 @@ async def search_itunes_track(session: aiohttp.ClientSession, artist, title, lim tracks = await asyncio.to_thread(_search) if not tracks: if COVER_DEBUG_QUERIES: - console.print(f"[cyan][DEBUG] iTunes track: no results for '{artist} - {title}', trying normalized fallback") + console.print( + f"[cyan][DEBUG] iTunes track: no results for '{artist} - {title}', trying normalized fallback" + ) norm_q = f"{normalize_name(artist)} {normalize_name(title)}" + def _search_norm_track(): try: - if hasattr(mod, 'search_track'): + if hasattr(mod, "search_track"): return mod.search_track(norm_q) - if hasattr(mod, 'Track') and hasattr(mod.Track, 'search'): + if hasattr(mod, "Track") and hasattr(mod.Track, "search"): return mod.Track.search(norm_q) - if hasattr(mod, 'search'): - return mod.search(norm_q, entity='song') + if hasattr(mod, "search"): + return mod.search(norm_q, entity="song") return None except Exception: return None @@ -409,11 +486,13 @@ async def search_itunes_track(session: aiohttp.ClientSession, artist, title, lim return None first = tracks[0] - art_url = getattr(first, 'artwork_url', None) or getattr(first, 'artworkUrl100', None) + art_url = getattr(first, "artwork_url", None) or getattr( + first, "artworkUrl100", None + ) if not art_url: return None - if '100x100' in art_url: - art_url = art_url.replace('100x100bb', '600x600bb') + if "100x100" in art_url: + art_url = art_url.replace("100x100bb", "600x600bb") await limiter.acquire() img_timeout = aiohttp.ClientTimeout(total=15) @@ -428,11 +507,14 @@ async def search_itunes_track(session: aiohttp.ClientSession, artist, title, lim return None -async def search_deezer_cover(session: aiohttp.ClientSession, artist, album, limiter: 'AsyncRateLimiter'): +async def search_deezer_cover( + session: aiohttp.ClientSession, artist, album, limiter: "AsyncRateLimiter" +): """Search Deezer for an album cover. Uses Deezer public API (no auth).""" try: # build simple query from urllib.parse import quote + query = f"{artist} {album}" if COVER_DEBUG_QUERIES: console.print(f"[cyan][DEBUG] Deezer query: {query}") @@ -443,17 +525,19 @@ async def search_deezer_cover(session: aiohttp.ClientSession, artist, album, lim if resp.status != 200: return None data = await resp.json() - items = data.get('data') or [] + items = data.get("data") or [] if COVER_DEBUG_QUERIES: try: dbg = [] for it in (items or [])[:3]: - dbg.append({ - 'id': it.get('id'), - 'title': it.get('title'), - 'cover_xl': it.get('cover_xl'), - 'cover_big': it.get('cover_big') - }) + dbg.append( + { + "id": it.get("id"), + "title": it.get("title"), + "cover_xl": it.get("cover_xl"), + "cover_big": it.get("cover_big"), + } + ) console.print(f"[cyan][DEBUG] Deezer candidates: {dbg}[/cyan]") except Exception: pass @@ -467,12 +551,14 @@ async def search_deezer_cover(session: aiohttp.ClientSession, artist, album, lim if resp2.status != 200: return None data2 = await resp2.json() - items = data2.get('data') or [] + items = data2.get("data") or [] if not items: return None first = items[0] # prefer XL or big covers - art_url = first.get('cover_xl') or first.get('cover_big') or first.get('cover') + art_url = ( + first.get("cover_xl") or first.get("cover_big") or first.get("cover") + ) if not art_url: return None await limiter.acquire() @@ -485,22 +571,27 @@ async def search_deezer_cover(session: aiohttp.ClientSession, artist, album, lim return None -async def search_lastfm_cover(session: aiohttp.ClientSession, artist, album, limiter: 'AsyncRateLimiter'): +async def search_lastfm_cover( + session: aiohttp.ClientSession, artist, album, limiter: "AsyncRateLimiter" +): """Search Last.fm for album cover using album.getInfo. Requires LASTFM_API_KEY in env.""" - LASTFM_API_KEY = os.getenv('LASTFM_API_KEY') + LASTFM_API_KEY = os.getenv("LASTFM_API_KEY") if not LASTFM_API_KEY: - console.print(f"[yellow]LastFM: LASTFM_API_KEY not configured; skipping LastFM search for '{artist} - {album}'[/yellow]") + console.print( + f"[yellow]LastFM: LASTFM_API_KEY not configured; skipping LastFM search for '{artist} - {album}'[/yellow]" + ) return None try: params = { - 'method': 'album.getinfo', - 'api_key': LASTFM_API_KEY, - 'artist': artist, - 'album': album, - 'format': 'json', + "method": "album.getinfo", + "api_key": LASTFM_API_KEY, + "artist": artist, + "album": album, + "format": "json", } from urllib.parse import quote - qs = '&'.join(f"{quote(str(k))}={quote(str(v))}" for k, v in params.items()) + + qs = "&".join(f"{quote(str(k))}={quote(str(v))}" for k, v in params.items()) url = f"http://ws.audioscrobbler.com/2.0/?{qs}" await limiter.acquire() timeout = aiohttp.ClientTimeout(total=10) @@ -508,15 +599,15 @@ async def search_lastfm_cover(session: aiohttp.ClientSession, artist, album, lim if resp.status != 200: return None data = await resp.json() - album_data = data.get('album') or {} - images = album_data.get('image') or [] + album_data = data.get("album") or {} + images = album_data.get("image") or [] # images is a list of dicts with '#text' and 'size' art_url = None # prefer 'extralarge' or 'mega' - for size_name in ('mega', 'extralarge', 'large', 'medium'): + for size_name in ("mega", "extralarge", "large", "medium"): for img in images: - if img.get('size') == size_name and img.get('#text'): - art_url = img.get('#text') + if img.get("size") == size_name and img.get("#text"): + art_url = img.get("#text") break if art_url: break @@ -545,26 +636,35 @@ def get_spotify_client(): return _SPOTIFY_CLIENT if not HAVE_SPOTIPY: return None - client_id = os.getenv('SPOTIFY_CLIENT_ID') - client_secret = os.getenv('SPOTIFY_CLIENT_SECRET') + client_id = os.getenv("SPOTIFY_CLIENT_ID") + client_secret = os.getenv("SPOTIFY_CLIENT_SECRET") if not client_id or not client_secret: return None try: import importlib - sp_mod = importlib.import_module('spotipy') - creds_mod = importlib.import_module('spotipy.oauth2') - SpotifyClientCredentials = getattr(creds_mod, 'SpotifyClientCredentials', None) - SpotifyCls = getattr(sp_mod, 'Spotify', None) + + sp_mod = importlib.import_module("spotipy") + creds_mod = importlib.import_module("spotipy.oauth2") + SpotifyClientCredentials = getattr(creds_mod, "SpotifyClientCredentials", None) + SpotifyCls = getattr(sp_mod, "Spotify", None) if SpotifyClientCredentials is None or SpotifyCls is None: return None - creds = SpotifyClientCredentials(client_id=client_id, client_secret=client_secret) + creds = SpotifyClientCredentials( + client_id=client_id, client_secret=client_secret + ) _SPOTIFY_CLIENT = SpotifyCls(client_credentials_manager=creds) return _SPOTIFY_CLIENT except Exception: return None -async def search_spotify_cover(session: aiohttp.ClientSession, artist, album, limiter: 'AsyncRateLimiter', isrc: Optional[str] = None): +async def search_spotify_cover( + session: aiohttp.ClientSession, + artist, + album, + limiter: "AsyncRateLimiter", + isrc: Optional[str] = None, +): """Search Spotify for album cover with multiple strategies: - If `isrc` provided, try track search by ISRC first. - Try quoted album+artist queries, then looser queries. @@ -573,10 +673,12 @@ async def search_spotify_cover(session: aiohttp.ClientSession, artist, album, li """ client = get_spotify_client() if client is None: - console.print(f"[yellow]Spotify: client not configured or spotipy not installed; skipping search for '{artist} - {album}'[/yellow]") + console.print( + f"[yellow]Spotify: client not configured or spotipy not installed; skipping search for '{artist} - {album}'[/yellow]" + ) return None - def _sp_search(q, typ='album', limit=3): + def _sp_search(q, typ="album", limit=3): try: return client.search(q=q, type=typ, limit=limit) except Exception: @@ -585,64 +687,93 @@ async def search_spotify_cover(session: aiohttp.ClientSession, artist, album, li try: # 1) ISRC search (track -> album) if isrc: - res = await asyncio.to_thread(_sp_search, f'isrc:{isrc}', 'track', 1) + res = await asyncio.to_thread(_sp_search, f"isrc:{isrc}", "track", 1) if res: - tracks = res.get('tracks', {}).get('items', []) + tracks = res.get("tracks", {}).get("items", []) if tracks: - album_obj = tracks[0].get('album') or {} - images = album_obj.get('images') or [] + album_obj = tracks[0].get("album") or {} + images = album_obj.get("images") or [] if images: # pick largest - best = max(images, key=lambda x: x.get('width') or 0) - art_url = best.get('url') + best = max(images, key=lambda x: x.get("width") or 0) + art_url = best.get("url") if art_url: await limiter.acquire() - async with session.get(art_url, timeout=aiohttp.ClientTimeout(total=15)) as img_resp: + async with session.get( + art_url, timeout=aiohttp.ClientTimeout(total=15) + ) as img_resp: if img_resp.status == 200: return await img_resp.read() # Prepare normalized variants for querying quoted_q = f'album:"{album}" artist:"{artist}"' - exact_q = f'artist:{artist} album:{album}' + exact_q = f"artist:{artist} album:{album}" norm_artist = normalize_name(artist) norm_album = normalize_name(album) - simple_q = f'album:{norm_album} artist:{norm_artist}' - queries = [quoted_q, exact_q, simple_q, f'album:"{album}"', f'artist:"{artist}"'] + simple_q = f"album:{norm_album} artist:{norm_artist}" + queries = [ + quoted_q, + exact_q, + simple_q, + f'album:"{album}"', + f'artist:"{artist}"', + ] for q in queries: - res = await asyncio.to_thread(_sp_search, q, 'album', 3) + res = await asyncio.to_thread(_sp_search, q, "album", 3) if not res: continue - albums = res.get('albums', {}).get('items', []) + albums = res.get("albums", {}).get("items", []) if COVER_DEBUG_QUERIES: try: dbg = [] for a in (albums or [])[:3]: - dbg.append({ - 'id': a.get('id'), - 'name': a.get('name'), - 'artists': [ar.get('name') for ar in (a.get('artists') or [])[:3] if ar.get('name')], - 'images': [img.get('url') for img in (a.get('images') or [])[:3]] - }) - console.print(f"[cyan][DEBUG] Spotify album candidates for query '{q}': {dbg}[/cyan]") + dbg.append( + { + "id": a.get("id"), + "name": a.get("name"), + "artists": [ + ar.get("name") + for ar in (a.get("artists") or [])[:3] + if ar.get("name") + ], + "images": [ + img.get("url") + for img in (a.get("images") or [])[:3] + ], + } + ) + console.print( + f"[cyan][DEBUG] Spotify album candidates for query '{q}': {dbg}[/cyan]" + ) except Exception: pass if not albums: continue # examine candidates and pick the best match via fuzzy matching for a in albums: - found_album = a.get('name') or '' - found_artist = ' '.join([ar.get('name') for ar in (a.get('artists') or []) if ar.get('name')]) - if is_fuzzy_match(artist, found_artist, threshold=75) and (not album or is_fuzzy_match(album, found_album, threshold=70)): - images = a.get('images') or [] + found_album = a.get("name") or "" + found_artist = " ".join( + [ + ar.get("name") + for ar in (a.get("artists") or []) + if ar.get("name") + ] + ) + if is_fuzzy_match(artist, found_artist, threshold=75) and ( + not album or is_fuzzy_match(album, found_album, threshold=70) + ): + images = a.get("images") or [] if not images: continue - best = max(images, key=lambda x: x.get('width') or 0) - art_url = best.get('url') + best = max(images, key=lambda x: x.get("width") or 0) + art_url = best.get("url") if art_url: await limiter.acquire() try: - async with session.get(art_url, timeout=aiohttp.ClientTimeout(total=15)) as img_resp: + async with session.get( + art_url, timeout=aiohttp.ClientTimeout(total=15) + ) as img_resp: if img_resp.status == 200: return await img_resp.read() except Exception: @@ -659,13 +790,27 @@ def is_fuzzy_match(expected, actual, threshold=80): return False return fuzz.token_set_ratio(expected.lower(), actual.lower()) >= threshold + # Fuzzy match for all fields -def is_metadata_match(expected_artist, expected_album, expected_title, found_artist, found_album, found_title, threshold=80): +def is_metadata_match( + expected_artist, + expected_album, + expected_title, + found_artist, + found_album, + found_title, + threshold=80, +): artist_match = is_fuzzy_match(expected_artist, found_artist, threshold) - album_match = is_fuzzy_match(expected_album, found_album, threshold) if expected_album else True + album_match = ( + is_fuzzy_match(expected_album, found_album, threshold) + if expected_album + else True + ) title_match = is_fuzzy_match(expected_title, found_title, threshold) return artist_match and album_match and title_match + # Utility to normalize artist/song names for searching def normalize_name(name): # Lowercase, strip, remove extra spaces, and remove common punctuation @@ -683,10 +828,14 @@ def artist_credit_to_name(ac): if isinstance(a, dict): # Common formats: {'name': 'Artist Name'} or {'artist': {'name': 'Artist Name'}} name = None - if a.get('name'): - name = a.get('name') - elif a.get('artist') and isinstance(a.get('artist'), dict) and a.get('artist', {}).get('name'): - name = a.get('artist', {}).get('name') + if a.get("name"): + name = a.get("name") + elif ( + a.get("artist") + and isinstance(a.get("artist"), dict) + and a.get("artist", {}).get("name") + ): + name = a.get("artist", {}).get("name") if name: parts.append(name) return " ".join(parts) @@ -709,12 +858,14 @@ for noisy_logger in [ logging.getLogger().setLevel(logging.CRITICAL) -async def fetch_srutil_cover(sr, artist, song, session: aiohttp.ClientSession, limiter: AsyncRateLimiter): +async def fetch_srutil_cover( + sr, artist, song, session: aiohttp.ClientSession, limiter: AsyncRateLimiter +): try: album = await sr.get_album_by_name(artist, song) - if not album or not album.get('id'): + if not album or not album.get("id"): return None - cover_url = await sr.get_cover_by_album_id(album['id'], 640) + cover_url = await sr.get_cover_by_album_id(album["id"], 640) if cover_url: await limiter.acquire() try: @@ -723,13 +874,17 @@ async def fetch_srutil_cover(sr, artist, song, session: aiohttp.ClientSession, l if resp.status == 200: return await resp.read() else: - console.print(f"[red]SRUtil: Failed to fetch cover art from URL (status {resp.status}): {cover_url}[/red]") + console.print( + f"[red]SRUtil: Failed to fetch cover art from URL (status {resp.status}): {cover_url}[/red]" + ) except Exception as e: console.print(f"[red]SRUtil: Exception fetching cover url: {e}[/red]") except Exception as e: msg = str(e) if "Cannot combine AUTHORIZATION header with AUTH argument" in msg: - console.print("[red]SRUtil: Skipping due to conflicting authentication method in dependency (AUTHORIZATION header + AUTH argument).[/red]") + console.print( + "[red]SRUtil: Skipping due to conflicting authentication method in dependency (AUTHORIZATION header + AUTH argument).[/red]" + ) else: console.print(f"[red]SRUtil: Exception: {e}[/red]") return None @@ -737,19 +892,21 @@ async def fetch_srutil_cover(sr, artist, song, session: aiohttp.ClientSession, l async def get_isrc(file): try: + def _read_isrc(): f = load_file(file) # music_tag may store ISRC under 'isrc' or 'ISRC' try: - val = f['isrc'].value + val = f["isrc"].value except Exception: try: - val = f['ISRC'].value + val = f["ISRC"].value except Exception: val = None if isinstance(val, list): return val[0] if val else None return val + return await asyncio.to_thread(_read_isrc) except Exception as e: console.print(f"[red]Error reading ISRC for {file}: {e}[/red]") @@ -772,17 +929,21 @@ async def search_musicbrainz_by_isrc(session, isrc, limiter: AsyncRateLimiter): data = await resp.json() except Exception: return None - recordings = data.get('recordings') or [] + recordings = data.get("recordings") or [] for rec in recordings: # try releases tied to this recording - releases = rec.get('releases') or [] + releases = rec.get("releases") or [] if releases: - relid = releases[0].get('id') + relid = releases[0].get("id") if relid: - caa_url = f"https://coverartarchive.org/release/{relid}/front-500" + caa_url = ( + f"https://coverartarchive.org/release/{relid}/front-500" + ) async with session.get(caa_url, timeout=timeout) as caa_resp: if caa_resp.status == 200: - console.print(f"[green]Found cover art via ISRC {isrc}[/green]") + console.print( + f"[green]Found cover art via ISRC {isrc}[/green]" + ) return await caa_resp.read() return None except Exception as e: @@ -793,6 +954,7 @@ async def search_musicbrainz_by_isrc(session, isrc, limiter: AsyncRateLimiter): # Concurrency limit for async processing CONCURRENCY = 18 + # Helper for formatting failure reasons in a consistent way def format_failure_reason(e, resp_status=None): """Format a failure reason from an exception or response status""" @@ -806,7 +968,10 @@ def format_failure_reason(e, resp_status=None): return str(e) return "no match found" -async def process_file(file, sr, table, results, sem, session: aiohttp.ClientSession, limiters: dict): + +async def process_file( + file, sr, table, results, sem, session: aiohttp.ClientSession, limiters: dict +): """Process a single audio file to find and embed cover art.""" async with sem: if await has_cover(file): @@ -823,7 +988,9 @@ async def process_file(file, sr, table, results, sem, session: aiohttp.ClientSes # Try ISRC-based lookup first isrc = await get_isrc(file) if isrc: - img = await search_musicbrainz_by_isrc(session, isrc, limiters['musicbrainz']) + img = await search_musicbrainz_by_isrc( + session, isrc, limiters["musicbrainz"] + ) if img: image_bytes = img source = f"MusicBrainz (ISRC:{isrc})" @@ -831,11 +998,15 @@ async def process_file(file, sr, table, results, sem, session: aiohttp.ClientSes ALBUM_ART_CACHE[album_key] = image_bytes _log_attempt(artist, album, title, source, "Success") else: - _log_attempt(artist, album, title, f"MusicBrainz (ISRC:{isrc})", "No match") + _log_attempt( + artist, album, title, f"MusicBrainz (ISRC:{isrc})", "No match" + ) # If ONLY_SPOTIFY testing mode is enabled, attempt only Spotify and return if ONLY_SPOTIFY: - img = await search_spotify_cover(session, artist, album, limiters['spotify'], isrc) + img = await search_spotify_cover( + session, artist, album, limiters["spotify"], isrc + ) if img: image_bytes = img source = "Spotify" @@ -845,13 +1016,19 @@ async def process_file(file, sr, table, results, sem, session: aiohttp.ClientSes file_basename = os.path.basename(file) ok = await embed_cover(file, image_bytes) if ok: - console.print(f"[green][FINAL RESULT] {file_basename} — {artist} / {album} | Success via {source}[/green]") + console.print( + f"[green][FINAL RESULT] {file_basename} — {artist} / {album} | Success via {source}[/green]" + ) else: status = "Embed Failed" - console.print(f"[red][FINAL RESULT] {file_basename} — {artist} / {album} | Embed Failed from {source}[/red]") + console.print( + f"[red][FINAL RESULT] {file_basename} — {artist} / {album} | Embed Failed from {source}[/red]" + ) else: _log_attempt(artist, album, title, "Spotify", "No match") - console.print(f"[yellow][FINAL RESULT] {os.path.basename(file)} — {artist} / {album} | No Spotify cover art found[/yellow]") + console.print( + f"[yellow][FINAL RESULT] {os.path.basename(file)} — {artist} / {album} | No Spotify cover art found[/yellow]" + ) table.add_row(file, status, source if source else "-") results.append([file, status, source if source else "-"]) @@ -859,7 +1036,9 @@ async def process_file(file, sr, table, results, sem, session: aiohttp.ClientSes # SRUtil if not image_bytes: - img = await fetch_srutil_cover(sr, artist, album, session, limiters['srutil']) + img = await fetch_srutil_cover( + sr, artist, album, session, limiters["srutil"] + ) if img: image_bytes = img source = "SRUtil" @@ -871,7 +1050,9 @@ async def process_file(file, sr, table, results, sem, session: aiohttp.ClientSes # MusicBrainz if not image_bytes: - img = await search_musicbrainz_cover(artist, album, session, limiters['musicbrainz']) + img = await search_musicbrainz_cover( + artist, album, session, limiters["musicbrainz"] + ) if img: image_bytes = img source = "MusicBrainz" @@ -883,7 +1064,9 @@ async def process_file(file, sr, table, results, sem, session: aiohttp.ClientSes # Discogs if not image_bytes: - img = await search_discogs_cover(artist, album, session, limiters['discogs']) + img = await search_discogs_cover( + artist, album, session, limiters["discogs"] + ) if img: image_bytes = img source = "Discogs" @@ -895,7 +1078,7 @@ async def process_file(file, sr, table, results, sem, session: aiohttp.ClientSes # Deezer if not image_bytes: - img = await search_deezer_cover(session, artist, album, limiters['deezer']) + img = await search_deezer_cover(session, artist, album, limiters["deezer"]) if img: image_bytes = img source = "Deezer" @@ -907,7 +1090,9 @@ async def process_file(file, sr, table, results, sem, session: aiohttp.ClientSes # Spotify if not image_bytes: - img = await search_spotify_cover(session, artist, album, limiters['spotify'], isrc) + img = await search_spotify_cover( + session, artist, album, limiters["spotify"], isrc + ) if img: image_bytes = img source = "Spotify" @@ -919,7 +1104,7 @@ async def process_file(file, sr, table, results, sem, session: aiohttp.ClientSes # iTunes album if not image_bytes: - img = await search_itunes_cover(session, artist, album, limiters['itunes']) + img = await search_itunes_cover(session, artist, album, limiters["itunes"]) if img: image_bytes = img source = "iTunes(album)" @@ -931,7 +1116,7 @@ async def process_file(file, sr, table, results, sem, session: aiohttp.ClientSes # iTunes track if not image_bytes: - img = await search_itunes_track(session, artist, title, limiters['itunes']) + img = await search_itunes_track(session, artist, title, limiters["itunes"]) if img: image_bytes = img source = "iTunes(track)" @@ -943,7 +1128,7 @@ async def process_file(file, sr, table, results, sem, session: aiohttp.ClientSes # Last.fm if not image_bytes: - img = await search_lastfm_cover(session, artist, album, limiters['lastfm']) + img = await search_lastfm_cover(session, artist, album, limiters["lastfm"]) if img: image_bytes = img source = "LastFM" @@ -958,22 +1143,29 @@ async def process_file(file, sr, table, results, sem, session: aiohttp.ClientSes if image_bytes and source: ok = await embed_cover(file, image_bytes) if ok: - console.print(f"[green][FINAL RESULT] {file_basename} — {artist} / {album} | Success via {source}[/green]") + console.print( + f"[green][FINAL RESULT] {file_basename} — {artist} / {album} | Success via {source}[/green]" + ) else: status = "Embed Failed" - console.print(f"[red][FINAL RESULT] {file_basename} — {artist} / {album} | Embed Failed from {source}[/red]") + console.print( + f"[red][FINAL RESULT] {file_basename} — {artist} / {album} | Embed Failed from {source}[/red]" + ) else: - console.print(f"[yellow][FINAL RESULT] {file_basename} — {artist} / {album} | No cover art found[/yellow]") + console.print( + f"[yellow][FINAL RESULT] {file_basename} — {artist} / {album} | No cover art found[/yellow]" + ) table.add_row(file, status, source if source else "-") results.append([file, status, source if source else "-"]) + async def has_cover(file): # Check if the audio file already has embedded cover art try: f = load_file(file) # music_tag stores artwork in 'artwork' which may be a list-like field - art = f['artwork'] + art = f["artwork"] # If there is any artwork, consider it present try: return bool(art.first) @@ -983,28 +1175,31 @@ async def has_cover(file): except Exception: return False + async def get_artist_album_title(file): # Extract artist, album, and title from audio file tags try: f = load_file(file) - artist = str(f['artist'].first) if f['artist'].first else "Unknown Artist" - album = str(f['album'].first) if f['album'].first else "Unknown Album" - title = str(f['title'].first) if f['title'].first else "Unknown Title" + artist = str(f["artist"].first) if f["artist"].first else "Unknown Artist" + album = str(f["album"].first) if f["album"].first else "Unknown Album" + title = str(f["title"].first) if f["title"].first else "Unknown Title" return artist, album, title except Exception: return "Unknown Artist", "Unknown Album", "Unknown Title" + async def embed_cover(file, image_bytes): # Embed cover art into audio file metadata using music_tag try: f = load_file(file) - f['artwork'] = image_bytes + f["artwork"] = image_bytes f.save() return True except Exception as e: console.print(f"[red][ERROR] Failed to embed cover: {e}[/red]") return False + async def main(): try: console.print(f"[bold blue]Scanning directory: {MUSIC_DIR}[/bold blue]") @@ -1022,7 +1217,6 @@ async def main(): table.add_column("Status", style="green") table.add_column("Source", style="magenta") - # create rate limiters (seconds between requests) RATE_SRUTIL = 0.1 RATE_MUSICBRAINZ = 1.0 @@ -1033,13 +1227,13 @@ async def main(): RATE_SPOTIFY = 0.5 limiters = { - 'srutil': AsyncRateLimiter(RATE_SRUTIL), - 'musicbrainz': AsyncRateLimiter(RATE_MUSICBRAINZ), - 'itunes': AsyncRateLimiter(RATE_ITUNES), - 'discogs': AsyncRateLimiter(RATE_DISCOGS), - 'deezer': AsyncRateLimiter(RATE_DEEZER), - 'lastfm': AsyncRateLimiter(RATE_LASTFM), - 'spotify': AsyncRateLimiter(RATE_SPOTIFY), + "srutil": AsyncRateLimiter(RATE_SRUTIL), + "musicbrainz": AsyncRateLimiter(RATE_MUSICBRAINZ), + "itunes": AsyncRateLimiter(RATE_ITUNES), + "discogs": AsyncRateLimiter(RATE_DISCOGS), + "deezer": AsyncRateLimiter(RATE_DEEZER), + "lastfm": AsyncRateLimiter(RATE_LASTFM), + "spotify": AsyncRateLimiter(RATE_SPOTIFY), } sem = asyncio.Semaphore(CONCURRENCY) @@ -1056,7 +1250,9 @@ async def main(): return str(e) return "no match found" - async def worker(file, sr, table, results, sem, progress, task_id, session, limiters): + async def worker( + file, sr, table, results, sem, progress, task_id, session, limiters + ): await process_file(file, sr, table, results, sem, session, limiters) progress.update(task_id, advance=1) @@ -1069,7 +1265,22 @@ async def main(): ) as progress: task_id = progress.add_task("Processing files...", total=len(files)) # Schedule all workers - await asyncio.gather(*(worker(file, sr, table, results, sem, progress, task_id, session, limiters) for file in files)) + await asyncio.gather( + *( + worker( + file, + sr, + table, + results, + sem, + progress, + task_id, + session, + limiters, + ) + for file in files + ) + ) # Print summary table and CSV after progress bar console.print(table) @@ -1083,5 +1294,6 @@ async def main(): traceback.print_exc() sys.exit(1) + if __name__ == "__main__": - asyncio.run(main()) \ No newline at end of file + asyncio.run(main()) diff --git a/test/test_search_track.py b/test/test_search_track.py index 6d61380..36aebcc 100644 --- a/test/test_search_track.py +++ b/test/test_search_track.py @@ -1,6 +1,7 @@ import asyncio import logging import sys + sys.path.insert(0, "..") from utils.sr_wrapper import SRUtil @@ -8,11 +9,16 @@ from utils.sr_wrapper import SRUtil logger = logging.getLogger() logger.setLevel(logging.CRITICAL) + async def main(): sr = SRUtil() artist_search = await sr.get_artists_by_name("Ren") # logging.critical("Artist search: %s", artist_search) - res = [dict(x) for x in artist_search if x.get('popularity', 0) and x.get('artist').lower() == 'ren'] + res = [ + dict(x) + for x in artist_search + if x.get("popularity", 0) and x.get("artist").lower() == "ren" + ] logging.critical("Results: %s", res) # search_res = await sr.get_album_by_name(artist[:8], album) # logging.critical("Search result: %s", search_res) @@ -23,4 +29,4 @@ async def main(): return -asyncio.run(main()) \ No newline at end of file +asyncio.run(main()) diff --git a/util.py b/util.py index 5620df4..48ef321 100644 --- a/util.py +++ b/util.py @@ -18,23 +18,45 @@ class Utilities: def get_blocked_response(self, path: Optional[str] = None): """ - Get Blocked HTTP Response + Return a redirect response for blocked requests. + + Args: + path (Optional[str]): The requested path (currently unused). + + Returns: + RedirectResponse: A redirect to the blocked URI. """ logging.error("Rejected request: Blocked") return RedirectResponse(url=self.blocked_redirect_uri) def get_no_endpoint_found(self, path: Optional[str] = None): """ - Get 404 Response + Raise an HTTP 404 exception for unknown endpoints. + + Args: + path (Optional[str]): The requested path (currently unused). + + Raises: + HTTPException: With status code 404 and detail "Unknown endpoint". """ logging.error("Rejected request: No such endpoint") raise HTTPException(detail="Unknown endpoint", status_code=404) def check_key(self, path: str, key: str, req_type: int = 0) -> bool: """ - Accepts path as an argument to allow fine tuning access for each API key, not currently in use. - """ + Check if the provided API key is valid and meets the requirements. + Args: + path (str): The request path (reserved for future fine-tuning). + key (str): The authorization header value, expected to start with "Bearer ". + req_type (int): The type of access required. + 0: Basic access. + 2: Private access (key must start with "PRV-"). + 4: Radio access (key must start with "RAD-"). + + Returns: + bool: True if the key is valid and meets the requirements, False otherwise. + """ if not key or not key.startswith("Bearer "): return False diff --git a/utils/radio_util.py b/utils/radio_util.py index c810396..9ae11e9 100644 --- a/utils/radio_util.py +++ b/utils/radio_util.py @@ -223,6 +223,7 @@ class RadioUtil: "artist": double_space.sub(" ", result["artist"].strip()), "song": double_space.sub(" ", result["song"].strip()), "artistsong": result["artistsong"].strip(), + "album": result["album"].strip() if result["album"] else "N/A", "genre": self.get_genre( double_space.sub(" ", result["artist"].strip()) ), diff --git a/utils/rip_background.py b/utils/rip_background.py index 7e83176..5519351 100644 --- a/utils/rip_background.py +++ b/utils/rip_background.py @@ -47,7 +47,13 @@ sr = SRUtil() # ---------- Discord helper ---------- -async def discord_notify(webhook_url: str, title: str, description: str, target: Optional[str] = None, color: int = 0x00FF00): +async def discord_notify( + webhook_url: str, + title: str, + description: str, + target: Optional[str] = None, + color: int = 0x00FF00, +): embed = { "title": title, "description": description[:1900] if description else "", @@ -64,15 +70,20 @@ async def discord_notify(webhook_url: str, title: str, description: str, target: while True: # permanent retry try: async with aiohttp.ClientSession() as session: - async with session.post(webhook_url, json=payload, timeout=aiohttp.ClientTimeout(total=10)) as resp: + async with session.post( + webhook_url, json=payload, timeout=aiohttp.ClientTimeout(total=10) + ) as resp: if resp.status >= 400: text = await resp.text() - raise RuntimeError(f"Discord webhook failed ({resp.status}): {text}") + raise RuntimeError( + f"Discord webhook failed ({resp.status}): {text}" + ) break except Exception as e: print(f"Discord send failed, retrying: {e}") await asyncio.sleep(5) + def send_log_to_discord(message: str, level: str, target: Optional[str] = None): colors = {"WARNING": 0xFFA500, "ERROR": 0xFF0000, "CRITICAL": 0xFF0000} color = colors.get(level.upper(), 0xFFFF00) @@ -83,7 +94,7 @@ def send_log_to_discord(message: str, level: str, target: Optional[str] = None): title=f"{level} in bulk_download", description=message, target=target, - color=color + color=color, ) try: @@ -98,6 +109,7 @@ def send_log_to_discord(message: str, level: str, target: Optional[str] = None): # ---------- Helpers ---------- def tag_with_mediafile(file_path: str, meta: dict): f = MediaFile(file_path) + def safe_set(attr, value, default=None, cast=None): if value is None: value = default @@ -106,6 +118,7 @@ def tag_with_mediafile(file_path: str, meta: dict): setattr(f, attr, cast(value)) else: setattr(f, attr, str(value)) + safe_set("title", meta.get("title"), default="Unknown Title") safe_set("artist", meta.get("artist"), default="Unknown Artist") safe_set("albumartist", meta.get("album_artist"), default="Unknown Artist") @@ -136,6 +149,7 @@ def tag_with_mediafile(file_path: str, meta: dict): if not cover_bytes and cover_url: try: import requests + resp = requests.get(cover_url, timeout=10) resp.raise_for_status() cover_bytes = resp.content @@ -188,7 +202,7 @@ def ensure_unique_filename_in_dir(parent: Path, filename: str) -> Path: # special-case .tar.gz if filename.lower().endswith(".tar.gz"): ext = ".tar.gz" - base = filename[:-len(ext)] + base = filename[: -len(ext)] else: p = Path(filename) ext = p.suffix @@ -235,13 +249,15 @@ def bulk_download(track_list: list, quality: str = "FLAC"): send_log_to_discord(f"Failed to init job.meta: {e}", "WARNING", target) # Job started Discord message - asyncio.run(discord_notify( - DISCORD_WEBHOOK, - title=f"Job Started: {job_id}", - description=f"Processing `{len(track_list)}` track(s)", - target=target, - color=0x00FFFF - )) + asyncio.run( + discord_notify( + DISCORD_WEBHOOK, + title=f"Job Started: {job_id}", + description=f"Processing `{len(track_list)}` track(s)", + target=target, + color=0x00FFFF, + ) + ) async def process_tracks(): per_track_meta = [] @@ -253,7 +269,11 @@ def bulk_download(track_list: list, quality: str = "FLAC"): # Set up a one-time rate-limit callback to notify on the first 429 seen by SRUtil async def _rate_limit_notify(exc: Exception): try: - send_log_to_discord(f"Rate limit observed while fetching metadata: {exc}", "WARNING", target) + send_log_to_discord( + f"Rate limit observed while fetching metadata: {exc}", + "WARNING", + target, + ) except Exception: pass @@ -265,7 +285,13 @@ def bulk_download(track_list: list, quality: str = "FLAC"): pass total = len(track_list or []) for i, track_id in enumerate(track_list or []): - track_info = {"track_id": str(track_id), "status": "Pending", "file_path": None, "error": None, "attempts": 0} + track_info = { + "track_id": str(track_id), + "status": "Pending", + "file_path": None, + "error": None, + "attempts": 0, + } attempt = 0 while attempt < MAX_RETRIES: @@ -326,13 +352,19 @@ def bulk_download(track_list: list, quality: str = "FLAC"): # Try to fetch cover art via SRUtil (use album_id from metadata) try: album_field = md.get("album") - album_id = md.get("album_id") or (album_field.get("id") if isinstance(album_field, dict) else None) + album_id = md.get("album_id") or ( + album_field.get("id") + if isinstance(album_field, dict) + else None + ) except Exception: album_id = None if album_id: try: - cover_url = await sr.get_cover_by_album_id(album_id, size=640) + cover_url = await sr.get_cover_by_album_id( + album_id, size=640 + ) except Exception: cover_url = None else: @@ -344,7 +376,9 @@ def bulk_download(track_list: list, quality: str = "FLAC"): if cover_url: try: timeout = aiohttp.ClientTimeout(total=15) - async with session.get(cover_url, timeout=timeout) as img_resp: + async with session.get( + cover_url, timeout=timeout + ) as img_resp: if img_resp.status == 200: img_bytes = await img_resp.read() else: @@ -375,23 +409,24 @@ def bulk_download(track_list: list, quality: str = "FLAC"): # Prefer music_tag if available (keeps compatibility with add_cover_art.py) try: from music_tag import load_file as mt_load_file # type: ignore + try: mf = mt_load_file(str(final_file)) # set basic tags - if md.get('title'): - mf['title'] = md.get('title') - if md.get('artist'): - mf['artist'] = md.get('artist') - if md.get('album'): - mf['album'] = md.get('album') - tracknum = md.get('track_number') + if md.get("title"): + mf["title"] = md.get("title") + if md.get("artist"): + mf["artist"] = md.get("artist") + if md.get("album"): + mf["album"] = md.get("album") + tracknum = md.get("track_number") if tracknum is not None: try: - mf['tracknumber'] = int(tracknum) + mf["tracknumber"] = int(tracknum) except Exception: pass if img_bytes: - mf['artwork'] = img_bytes + mf["artwork"] = img_bytes mf.save() embedded = True except Exception: @@ -438,7 +473,9 @@ def bulk_download(track_list: list, quality: str = "FLAC"): wait_time = min(60, 2**attempt) await asyncio.sleep(wait_time) else: - await asyncio.sleep(random.uniform(THROTTLE_MIN, THROTTLE_MAX)) + await asyncio.sleep( + random.uniform(THROTTLE_MIN, THROTTLE_MAX) + ) except Exception as e: tb = traceback.format_exc() @@ -447,7 +484,11 @@ def bulk_download(track_list: list, quality: str = "FLAC"): track_info["error"] = str(e) if attempt >= MAX_RETRIES: track_info["status"] = "Failed" - send_log_to_discord(f"Track {track_id} failed after {attempt} attempts", "ERROR", target) + send_log_to_discord( + f"Track {track_id} failed after {attempt} attempts", + "ERROR", + target, + ) await asyncio.sleep(random.uniform(THROTTLE_MIN, THROTTLE_MAX)) finally: @@ -464,7 +505,11 @@ def bulk_download(track_list: list, quality: str = "FLAC"): job.meta["tarball"] = None job.meta["status"] = "Failed" job.save_meta() - send_log_to_discord(f"No tracks were successfully downloaded for job `{job_id}`", "CRITICAL", target) + send_log_to_discord( + f"No tracks were successfully downloaded for job `{job_id}`", + "CRITICAL", + target, + ) return [] # Tarball creation @@ -476,7 +521,11 @@ def bulk_download(track_list: list, quality: str = "FLAC"): except Exception: artist = "Unknown Artist" artist_counts[artist] = artist_counts.get(artist, 0) + 1 - top_artist = sorted(artist_counts.items(), key=lambda kv: (-kv[1], kv[0]))[0][0] if artist_counts else "Unknown Artist" + top_artist = ( + sorted(artist_counts.items(), key=lambda kv: (-kv[1], kv[0]))[0][0] + if artist_counts + else "Unknown Artist" + ) # Prefer `job.meta['target']` when provided by the enqueuer. Fall back to the top artist. target_name = None try: @@ -485,7 +534,11 @@ def bulk_download(track_list: list, quality: str = "FLAC"): except Exception: target_name = None - base_label = sanitize_filename(target_name) if target_name else sanitize_filename(top_artist) + base_label = ( + sanitize_filename(target_name) + if target_name + else sanitize_filename(top_artist) + ) staged_tarball = staging_root / f"{base_label}.tar.gz" counter = 1 @@ -504,14 +557,24 @@ def bulk_download(track_list: list, quality: str = "FLAC"): job.save_meta() logging.info("Creating tarball: %s", staged_tarball) - await discord_notify(DISCORD_WEBHOOK, - title=f"Compressing: Job {job_id}", - description=f"Creating tarball: `{len(all_final_files)}` track(s).\nStaging path: {staged_tarball}", - color=0xFFA500, - target=target) + await discord_notify( + DISCORD_WEBHOOK, + title=f"Compressing: Job {job_id}", + description=f"Creating tarball: `{len(all_final_files)}` track(s).\nStaging path: {staged_tarball}", + color=0xFFA500, + target=target, + ) try: subprocess.run( - ["tar", "-I", "pigz -9", "-cf", str(staged_tarball), "-C", str(staging_root)] + [ + "tar", + "-I", + "pigz -9", + "-cf", + str(staged_tarball), + "-C", + str(staging_root), + ] + [str(f.relative_to(staging_root)) for f in all_final_files], check=True, ) @@ -521,7 +584,11 @@ def bulk_download(track_list: list, quality: str = "FLAC"): except Exception: pass except FileNotFoundError: - send_log_to_discord("pigz not available, falling back to tarfile (slower).", "WARNING", target) + send_log_to_discord( + "pigz not available, falling back to tarfile (slower).", + "WARNING", + target, + ) with tarfile.open(staged_tarball, "w:gz") as tar: for f in all_final_files: try: @@ -535,7 +602,9 @@ def bulk_download(track_list: list, quality: str = "FLAC"): pass if not staged_tarball.exists(): - send_log_to_discord(f"Tarball was not created: `{staged_tarball}`", "CRITICAL", target) + send_log_to_discord( + f"Tarball was not created: `{staged_tarball}`", "CRITICAL", target + ) if job: job.meta["status"] = "compress_failed" job.save_meta() @@ -556,13 +625,13 @@ def bulk_download(track_list: list, quality: str = "FLAC"): # Job completed Discord message completed = len(all_final_files) - failed = (len(track_list) - completed) + failed = len(track_list) - completed await discord_notify( DISCORD_WEBHOOK, title=f"Job Completed: {job_id}", description=f"Processed `{len(track_list)}` track(s).\nCompleted: `{completed}`\nFailed: `{failed}`\nTarball: `{final_tarball}`", target=target, - color=0x00FF00 + color=0x00FF00, ) return [str(final_tarball)] @@ -572,7 +641,9 @@ def bulk_download(track_list: list, quality: str = "FLAC"): try: return loop.run_until_complete(process_tracks()) except Exception as e: - send_log_to_discord(f"bulk_download failed: {e}\n{traceback.format_exc()}", "CRITICAL", target) + send_log_to_discord( + f"bulk_download failed: {e}\n{traceback.format_exc()}", "CRITICAL", target + ) if job: job.meta["status"] = "Failed" job.save_meta() diff --git a/utils/sr_wrapper.py b/utils/sr_wrapper.py index 79e3d64..f0d3791 100644 --- a/utils/sr_wrapper.py +++ b/utils/sr_wrapper.py @@ -30,7 +30,6 @@ for name in [__name__, "utils.sr_wrapper"]: logging.getLogger().setLevel(logging.CRITICAL) - load_dotenv() @@ -66,7 +65,9 @@ class SRUtil: self.streamrip_client = TidalClient(self.streamrip_config) self.MAX_CONCURRENT_METADATA_REQUESTS = 2 self.METADATA_RATE_LIMIT = 1.25 - self.METADATA_SEMAPHORE = asyncio.Semaphore(self.MAX_CONCURRENT_METADATA_REQUESTS) + self.METADATA_SEMAPHORE = asyncio.Semaphore( + self.MAX_CONCURRENT_METADATA_REQUESTS + ) self.LAST_METADATA_REQUEST = 0 self.MAX_METADATA_RETRIES = 5 self.METADATA_ALBUM_CACHE: dict[str, dict] = {} @@ -77,16 +78,18 @@ class SRUtil: self._rate_limit_notified = False async def rate_limited_request(self, func, *args, **kwargs): - async with self.METADATA_SEMAPHORE: - now = time.time() - elapsed = now - self.LAST_METADATA_REQUEST - if elapsed < self.METADATA_RATE_LIMIT: - await asyncio.sleep(self.METADATA_RATE_LIMIT - elapsed) - result = await func(*args, **kwargs) - self.LAST_METADATA_REQUEST = time.time() - return result + async with self.METADATA_SEMAPHORE: + now = time.time() + elapsed = now - self.LAST_METADATA_REQUEST + if elapsed < self.METADATA_RATE_LIMIT: + await asyncio.sleep(self.METADATA_RATE_LIMIT - elapsed) + result = await func(*args, **kwargs) + self.LAST_METADATA_REQUEST = time.time() + return result - async def _safe_api_call(self, func, *args, retries: int = 2, backoff: float = 0.5, **kwargs): + async def _safe_api_call( + self, func, *args, retries: int = 2, backoff: float = 0.5, **kwargs + ): """Call an async API function with resilient retry behavior. - On AttributeError: attempt a `login()` once and retry. @@ -116,7 +119,11 @@ class SRUtil: if ("400" in msg or "429" in msg) and attempt < retries - 1: # Notify on the first observed 429 (if a callback is set) try: - if "429" in msg and not self._rate_limit_notified and self.on_rate_limit: + if ( + "429" in msg + and not self._rate_limit_notified + and self.on_rate_limit + ): self._rate_limit_notified = True try: if asyncio.iscoroutinefunction(self.on_rate_limit): @@ -128,17 +135,29 @@ class SRUtil: pass except Exception: pass - await asyncio.sleep(backoff * (2 ** attempt)) + await asyncio.sleep(backoff * (2**attempt)) continue # Connection related errors — try to re-login then retry - if isinstance(e, (aiohttp.ClientError, OSError, ConnectionError, asyncio.TimeoutError)) or "Connection" in msg or "closed" in msg.lower(): + if ( + isinstance( + e, + ( + aiohttp.ClientError, + OSError, + ConnectionError, + asyncio.TimeoutError, + ), + ) + or "Connection" in msg + or "closed" in msg.lower() + ): try: await self.streamrip_client.login() except Exception: pass if attempt < retries - 1: - await asyncio.sleep(backoff * (2 ** attempt)) + await asyncio.sleep(backoff * (2**attempt)) continue # Unhandled / permanent error: re-raise after loop ends @@ -151,10 +170,23 @@ class SRUtil: if not expected or not actual: return False return fuzz.token_set_ratio(expected.lower(), actual.lower()) >= threshold - - def is_metadata_match(self, expected_artist, expected_album, expected_title, found_artist, found_album, found_title, threshold=80): + + def is_metadata_match( + self, + expected_artist, + expected_album, + expected_title, + found_artist, + found_album, + found_title, + threshold=80, + ): artist_match = self.is_fuzzy_match(expected_artist, found_artist, threshold) - album_match = self.is_fuzzy_match(expected_album, found_album, threshold) if expected_album else True + album_match = ( + self.is_fuzzy_match(expected_album, found_album, threshold) + if expected_album + else True + ) title_match = self.is_fuzzy_match(expected_title, found_title, threshold) return artist_match and album_match and title_match @@ -166,7 +198,9 @@ class SRUtil: deduped[norm] = entry return list(deduped.values()) - def group_artists_by_name(self, entries: list[dict], query: Optional[str] = None) -> list[dict]: + def group_artists_by_name( + self, entries: list[dict], query: Optional[str] = None + ) -> list[dict]: """ Group artist entries by normalized display name and pick a primary candidate per name. @@ -199,10 +233,15 @@ class SRUtil: score = 0.0 if query: try: - if it.get("artist", "").strip().lower() == query.strip().lower(): + if ( + it.get("artist", "").strip().lower() + == query.strip().lower() + ): score += 1000.0 else: - score += float(fuzz.token_set_ratio(query, it.get("artist", ""))) + score += float( + fuzz.token_set_ratio(query, it.get("artist", "")) + ) except Exception: score += 0.0 # add small weight for popularity if present @@ -216,12 +255,14 @@ class SRUtil: primary = scored[0][1] alternatives = [it for _, it in scored[1:]] - out.append({ - "artist": primary.get("artist"), - "id": primary.get("id"), - "popularity": primary.get("popularity"), - "alternatives": alternatives, - }) + out.append( + { + "artist": primary.get("artist"), + "id": primary.get("id"), + "popularity": primary.get("popularity"), + "alternatives": alternatives, + } + ) return out @@ -230,14 +271,16 @@ class SRUtil: return None m, s = divmod(seconds, 60) return f"{m}:{s:02}" - + def _get_tidal_cover_url(self, uuid, size): """Generate a tidal cover url. :param uuid: VALID uuid string :param size: """ - TIDAL_COVER_URL = "https://resources.tidal.com/images/{uuid}/{width}x{height}.jpg" + TIDAL_COVER_URL = ( + "https://resources.tidal.com/images/{uuid}/{width}x{height}.jpg" + ) possibles = (80, 160, 320, 640, 1280) assert size in possibles, f"size must be in {possibles}" return TIDAL_COVER_URL.format( @@ -246,8 +289,6 @@ class SRUtil: width=size, ) - - def combine_album_track_metadata( self, album_json: dict | None, track_json: dict ) -> dict: @@ -288,10 +329,14 @@ class SRUtil: "peak": track_json.get("peak"), "lyrics": track_json.get("lyrics"), "track_copyright": track_json.get("copyright"), - "cover_id": track_json.get("album", {}).get("cover") or album_json.get("cover"), + "cover_id": track_json.get("album", {}).get("cover") + or album_json.get("cover"), "cover_url": ( f"https://resources.tidal.com/images/{track_json.get('album', {}).get('cover', album_json.get('cover'))}/1280x1280.jpg" - if (track_json.get("album", {}).get("cover") or album_json.get("cover")) + if ( + track_json.get("album", {}).get("cover") + or album_json.get("cover") + ) else None ), } @@ -299,7 +344,6 @@ class SRUtil: return combined - def combine_album_with_all_tracks( self, album_json: dict[str, Any] ) -> list[dict[str, Any]]: @@ -309,7 +353,9 @@ class SRUtil: for t in album_json.get("tracks", []) ] - async def get_artists_by_name(self, artist_name: str, group: bool = False) -> Optional[list]: + async def get_artists_by_name( + self, artist_name: str, group: bool = False + ) -> Optional[list]: """Get artist(s) by name. Args: @@ -324,7 +370,12 @@ class SRUtil: delay = 1.0 for attempt in range(max_retries): try: - artists = await self._safe_api_call(self.streamrip_client.search, media_type="artist", query=artist_name, retries=3) + artists = await self._safe_api_call( + self.streamrip_client.search, + media_type="artist", + query=artist_name, + retries=3, + ) break except Exception as e: msg = str(e) @@ -344,7 +395,9 @@ class SRUtil: artists_page = artists[0] if len(artists) > 0 else {} else: artists_page = artists - artists_items = artists_page.get("items", []) if isinstance(artists_page, dict) else [] + artists_items = ( + artists_page.get("items", []) if isinstance(artists_page, dict) else [] + ) if not artists_items: return None artists_out = [ @@ -365,13 +418,19 @@ class SRUtil: async def get_albums_by_artist_id(self, artist_id: int) -> Optional[list | dict]: """Get albums by artist ID. Retry login only on authentication failure. Rate limit and retry on 400/429.""" import asyncio + artist_id_str: str = str(artist_id) albums_out: list[dict] = [] max_retries = 4 delay = 1.0 for attempt in range(max_retries): try: - metadata = await self._safe_api_call(self.streamrip_client.get_metadata, artist_id_str, "artist", retries=3) + metadata = await self._safe_api_call( + self.streamrip_client.get_metadata, + artist_id_str, + "artist", + retries=3, + ) break except Exception as e: msg = str(e) @@ -397,10 +456,10 @@ class SRUtil: if "title" in album and "id" in album and "artists" in album ] return albums_out - + async def get_album_by_name(self, artist: str, album: str) -> Optional[dict]: """Get album by artist and album name using artist ID and fuzzy matching. Try first 8 chars, then 12 if no match. Notify on success.""" - # Notification moved to add_cover_art.py as requested + # Notification moved to add_cover_art.py as requested for trunc in (8, 12): search_artist = artist[:trunc] artists = await self.get_artists_by_name(search_artist) @@ -429,15 +488,22 @@ class SRUtil: if best_album and best_album_score >= 85: return best_album return None - - async def get_cover_by_album_id(self, album_id: int, size: int = 640) -> Optional[str]: + + async def get_cover_by_album_id( + self, album_id: int, size: int = 640 + ) -> Optional[str]: """Get cover URL by album ID. Retry login only on authentication failure.""" if size not in [80, 160, 320, 640, 1280]: return None album_id_str: str = str(album_id) for attempt in range(2): try: - metadata = await self._safe_api_call(self.streamrip_client.get_metadata, item_id=album_id_str, media_type="album", retries=2) + metadata = await self._safe_api_call( + self.streamrip_client.get_metadata, + item_id=album_id_str, + media_type="album", + retries=2, + ) break except Exception: if attempt == 1: @@ -452,7 +518,6 @@ class SRUtil: cover_url = self._get_tidal_cover_url(cover_id, size) return cover_url - async def get_tracks_by_album_id( self, album_id: int, quality: str = "FLAC" ) -> Optional[list | dict]: @@ -464,7 +529,12 @@ class SRUtil: """ album_id_str = str(album_id) try: - metadata = await self._safe_api_call(self.streamrip_client.get_metadata, item_id=album_id_str, media_type="album", retries=2) + metadata = await self._safe_api_call( + self.streamrip_client.get_metadata, + item_id=album_id_str, + media_type="album", + retries=2, + ) except Exception as e: logging.warning("get_tracks_by_album_id failed: %s", e) return None @@ -486,7 +556,9 @@ class SRUtil: ] return tracks_out - async def get_tracks_by_artist_song(self, artist: str, song: str, n: int = 0) -> Optional[list]: + async def get_tracks_by_artist_song( + self, artist: str, song: str, n: int = 0 + ) -> Optional[list]: """Get track by artist and song name Args: artist (str): The name of the artist. @@ -496,9 +568,18 @@ class SRUtil: TODO: Reimplement using StreamRip """ try: - search_res = await self._safe_api_call(self.streamrip_client.search, media_type="track", query=f"{artist} - {song}", retries=3) + search_res = await self._safe_api_call( + self.streamrip_client.search, + media_type="track", + query=f"{artist} - {song}", + retries=3, + ) logging.critical("Result: %s", search_res) - return search_res[0].get('items') if search_res and isinstance(search_res, list) else [] + return ( + search_res[0].get("items") + if search_res and isinstance(search_res, list) + else [] + ) except Exception as e: traceback.print_exc() logging.critical("Search Exception: %s", str(e)) @@ -529,10 +610,15 @@ class SRUtil: quality_int = 1 track_id_str: str = str(track_id) - # Ensure client is logged in via safe call when needed inside _safe_api_call + # Ensure client is logged in via safe call when needed inside _safe_api_call try: logging.critical("Using quality_int: %s", quality_int) - track = await self._safe_api_call(self.streamrip_client.get_downloadable, track_id=track_id_str, quality=quality_int, retries=3) + track = await self._safe_api_call( + self.streamrip_client.get_downloadable, + track_id=track_id_str, + quality=quality_int, + retries=3, + ) except Exception as e: logging.warning("get_stream_url_by_track_id failed: %s", e) return None @@ -557,7 +643,7 @@ class SRUtil: metadata = await self.rate_limited_request( self.streamrip_client.get_metadata, str(track_id), "track" ) - + album_id = metadata.get("album", {}).get("id") album_metadata = None @@ -567,7 +653,11 @@ class SRUtil: album_metadata = self.METADATA_ALBUM_CACHE[album_id] else: album_metadata = await self.rate_limited_request( - lambda i, t: self._safe_api_call(self.streamrip_client.get_metadata, i, t, retries=2), album_id, "album" + lambda i, t: self._safe_api_call( + self.streamrip_client.get_metadata, i, t, retries=2 + ), + album_id, + "album", ) if not album_metadata: return None @@ -611,11 +701,12 @@ class SRUtil: self.MAX_METADATA_RETRIES, ) # Raise a specific exception so callers can react (e.g. notify) - raise MetadataFetchError(f"Metadata fetch failed permanently for track {track_id} after {self.MAX_METADATA_RETRIES} attempts: {e}") + raise MetadataFetchError( + f"Metadata fetch failed permanently for track {track_id} after {self.MAX_METADATA_RETRIES} attempts: {e}" + ) # If we reach here without returning, raise a generic metadata error raise MetadataFetchError(f"Metadata fetch failed for track {track_id}") - async def download(self, track_id: int, quality: str = "LOSSLESS") -> bool | str: """Download track Args: