# Standard library import os import sys import re import csv import asyncio import logging import traceback # Third-party import aiohttp from pathlib import Path from dotenv import load_dotenv from rapidfuzz import fuzz from music_tag import load_file # type: ignore from rich.console import Console from rich.table import Table from rich.progress import ( Progress, BarColumn, TextColumn, TimeElapsedColumn, TaskProgressColumn, ) # Local imports sys.path.insert(0, "..") from utils.sr_wrapper import SRUtil import musicbrainzngs # type: ignore from discogs_client import Client # type: ignore # typing helper from typing import Any, cast, Optional # Optional: use the popular `itunespy` PyPI package when available try: import itunespy # type: ignore HAVE_ITUNESPY = True except Exception: itunespy = None HAVE_ITUNESPY = False # Optional: use `spotipy` when available for Spotify lookups try: import spotipy # type: ignore HAVE_SPOTIPY = True except Exception: spotipy = None HAVE_SPOTIPY = False # Reminder: If you see 'Import "music_tag" could not be resolved', run: # uv add music-tag # Configurable paths and extensions MUSIC_DIR = Path("/storage/music2/completed/FLAC/review") AUDIO_EXTS = {".flac", ".mp3", ".m4a", ".ogg", ".wav", ".aac"} REPORT_CSV = "cover_art_report.csv" ALBUM_ART_CACHE: dict = {} # Reminder: If you see 'Import "music_tag" could not be resolved', run: # uv add music-tag async def search_musicbrainz_cover( artist, album, session: aiohttp.ClientSession, limiter: "AsyncRateLimiter" ): # Use musicbrainzngs to search for a release-group matching artist+album try: # search for release-groups using a thread to avoid blocking query = f"artist:{artist} AND release:{album}" try: res = await asyncio.to_thread( musicbrainzngs.search_release_groups, query, 5 ) except Exception: res = {} if COVER_DEBUG_QUERIES: try: rgs_dbg = res.get("release-group-list") or [] dbg_info = [] for rg in rgs_dbg[:3]: dbg_info.append( { "id": rg.get("id"), "title": rg.get("title"), "artist": artist_credit_to_name( rg.get("artist-credit", []) ), } ) console.print( f"[cyan][DEBUG] MusicBrainz candidates: {dbg_info}[/cyan]" ) except Exception: pass rgs = res.get("release-group-list") or [] if COVER_DEBUG_QUERIES: try: dbg_info = [] for rg in (rgs or [])[:3]: dbg_info.append( { "id": rg.get("id"), "title": rg.get("title"), "artist": artist_credit_to_name( rg.get("artist-credit", []) ), } ) console.print( f"[cyan][DEBUG] MusicBrainz top candidates: {dbg_info}[/cyan]" ) except Exception: pass for rg in rgs: # try to get cover art via Cover Art Archive for releases in the group # check releases for a cover releases = rg.get("release-list") or [] for rel in releases: relid = rel.get("id") if relid: caa_url = f"https://coverartarchive.org/release/{relid}/front-500" try: await limiter.acquire() timeout = aiohttp.ClientTimeout(total=15) async with session.get(caa_url, timeout=timeout) as resp: if resp.status == 200: return await resp.read() except Exception: continue return None except Exception as e: console.print(f"[red]MusicBrainz search exception: {e}[/red]") return None async def search_discogs_cover( artist, album, session: aiohttp.ClientSession, limiter: "AsyncRateLimiter" ): # Use discogs_client to search for releases matching artist+album try: if not DISCOGS_TOKEN: return None # Use the discogs client (synchronous) to search in a thread try: await limiter.acquire() if COVER_DEBUG_QUERIES: console.print( f"[cyan][DEBUG] Discogs query: album='{album}' artist='{artist}'" ) results = await asyncio.to_thread( discogs_client.search, album, {"artist": artist, "type": "release"} ) except Exception: results = [] if COVER_DEBUG_QUERIES: try: dbg = [] for rr in (results or [])[:3]: try: data = getattr(rr, "data", {}) or {} dbg.append( { "id": data.get("id"), "title": data.get("title") or getattr(rr, "title", None), "cover_image": data.get("cover_image"), } ) except Exception: continue console.print(f"[cyan][DEBUG] Discogs candidates: {dbg}[/cyan]") except Exception: pass if not results: # conservative normalized fallback: try a combined normalized string try: await limiter.acquire() combined = f"{normalize_name(artist)} {normalize_name(album)}" if COVER_DEBUG_QUERIES: console.print(f"[cyan][DEBUG] Discogs fallback query: {combined}") results = await asyncio.to_thread( discogs_client.search, combined, {"type": "release"} ) except Exception: results = [] for r in results: # r.data may contain 'cover_image' or images cover = None try: cover = r.data.get("cover_image") except Exception: cover = None if not cover: # try images list imgs = r.data.get("images") or [] if imgs and isinstance(imgs, list) and imgs[0].get("uri"): cover = imgs[0].get("uri") if cover: # fetch image via aiohttp try: await limiter.acquire() timeout = aiohttp.ClientTimeout(total=15) async with session.get(cover, timeout=timeout) as resp: if resp.status == 200: return await resp.read() except Exception: continue return None except Exception as e: console.print(f"[red]Discogs search exception: {e}[/red]") return None # Load env once load_dotenv() # Console for pretty output console = Console() # If set to '1'|'true', run only Spotify searches (useful for quick testing) ONLY_SPOTIFY = os.getenv("ONLY_SPOTIFY", "").lower() in ("1", "true", "yes") # If set, print query strings and brief response info for debugging COVER_DEBUG_QUERIES = os.getenv("COVER_DEBUG_QUERIES", "").lower() in ( "1", "true", "yes", ) # If set, use more aggressive fuzzy thresholds and extra fallbacks COVER_AGGRESSIVE = os.getenv("COVER_AGGRESSIVE", "").lower() in ("1", "true", "yes") def _log_attempt(artist, album, title, source, result): """Log a single, clean attempt line to console and to `search_attempts.log`. result should be a short string like 'Success', 'No match', 'Timeout', or an error message. """ try: a = artist or "Unknown Artist" al = album or "Unknown Album" t = title or "Unknown Title" line = f"SEARCH: {a} - {al} / {t} | Source: {source} | Result: {result}" console.print(line) try: with open("search_attempts.log", "a", encoding="utf-8") as lf: lf.write(line + "\n") except Exception: pass except Exception: # Never crash logging pass # Define a lightweight async rate limiter class AsyncRateLimiter: def __init__(self, rate_seconds: float): self._rate = float(rate_seconds) self._lock = asyncio.Lock() self._last = 0.0 async def acquire(self) -> None: async with self._lock: now = asyncio.get_event_loop().time() wait = self._rate - (now - self._last) if wait > 0: await asyncio.sleep(wait) self._last = asyncio.get_event_loop().time() # Initialize MusicBrainz client musicbrainzngs.set_useragent("cover-art-script", "1.0", "your-email@example.com") # Initialize Discogs client DISCOGS_TOKEN = os.getenv("DISCOGS_TOKEN") discogs_client = Client("cover-art-script/1.0", user_token=DISCOGS_TOKEN) # Define the log_api_response function at the top of the script async def log_api_response(api_name, response): """Log relevant parts of API responses for debugging purposes.""" try: data = await response.json() if api_name == "MusicBrainz": release_groups = data.get("release-groups", []) relevant_info = [ { "id": rg.get("id"), "title": rg.get("title"), "artist": artist_credit_to_name(rg.get("artist-credit", [])), } for rg in release_groups ] console.print( f"[cyan][DEBUG] {api_name} relevant response: {relevant_info}[/cyan]" ) elif api_name == "Discogs": results = data.get("results", []) relevant_info = [ { "id": result.get("id"), "title": result.get("title"), "cover_image": result.get("cover_image"), } for result in results ] console.print( f"[cyan][DEBUG] {api_name} relevant response: {relevant_info}[/cyan]" ) elif api_name == "iTunes": results = data.get("results", []) relevant_info = [ { "collectionId": result.get("collectionId"), "collectionName": result.get("collectionName"), "artworkUrl100": result.get("artworkUrl100"), } for result in results ] console.print( f"[cyan][DEBUG] {api_name} relevant response: {relevant_info}[/cyan]" ) else: console.print(f"[cyan][DEBUG] {api_name} response: {data}[/cyan]") except Exception as e: console.print(f"[red][DEBUG] Failed to parse {api_name} response: {e}[/red]") # Helper to strip common parenthetical tags from album names def strip_album_tags(album): """Remove common parenthetical tags from the end of album names.""" pattern = r"\s*\((deluxe|remaster(ed)?|original mix|expanded|bonus|edition|version|mono|stereo|explicit|clean|anniversary|special|reissue|expanded edition|bonus track(s)?|international|digital|single|ep|live|instrumental|karaoke|radio edit|explicit version|clean version|acoustic|demo|re-recorded|remix|mix|edit|feat\.?|featuring|with .+|from .+|soundtrack|ost|score|session|vol(ume)? ?\d+|disc ?\d+|cd ?\d+|lp ?\d+|vinyl|202[0-9]|20[0-1][0-9]|19[0-9][0-9])\)$" return re.sub(pattern, "", album, flags=re.IGNORECASE).strip() # Helper to strip common trailing tags like EP, LP, Single, Album, etc. from album names def strip_album_suffix(album): # Remove trailing tags like ' EP', ' LP', ' Single', ' Album', ' Remix', ' Version', etc. # Only if they appear at the end, case-insensitive, with or without punctuation suffix_pattern = r"[\s\-_:]*(ep|lp|single|album|remix|version|edit|mix|deluxe|expanded|anniversary|reissue|instrumental|karaoke|ost|score|session|mono|stereo|explicit|clean|bonus|disc ?\d+|cd ?\d+|vinyl|202[0-9]|20[0-1][0-9]|19[0-9][0-9])$" return re.sub(suffix_pattern, "", album, flags=re.IGNORECASE).strip() # iTunes/Apple Music API fallback (async) async def search_itunes_cover( session: aiohttp.ClientSession, artist, album, limiter: "AsyncRateLimiter" ): # Use only the `itunespy` library for iTunes album lookups. if not HAVE_ITUNESPY: console.print( f"[yellow]iTunes: itunespy not available; skipping iTunes album search for '{artist} - {album}'[/yellow]" ) return None try: mod = cast(Any, itunespy) def _search(): try: # try common itunespy APIs safely if hasattr(mod, "search_album"): return mod.search_album(f"{artist} {album}") if hasattr(mod, "Album") and hasattr(mod.Album, "search"): return mod.Album.search(f"{artist} {album}") if hasattr(mod, "search"): return mod.search(f"{artist} {album}", entity="album") return None except Exception: return None albums = await asyncio.to_thread(_search) if COVER_DEBUG_QUERIES and albums: try: dbg = [] for a in (albums or [])[:3]: try: aid = getattr(a, "collectionId", None) or ( a.get("collectionId") if isinstance(a, dict) else None ) except Exception: aid = None try: aname = getattr(a, "collectionName", None) or ( a.get("collectionName") if isinstance(a, dict) else None ) except Exception: aname = None dbg.append({"id": aid, "name": aname}) console.print(f"[cyan][DEBUG] iTunes album candidates: {dbg}[/cyan]") except Exception: pass if not albums: if COVER_DEBUG_QUERIES: console.print( f"[cyan][DEBUG] iTunes album: no results for '{artist} - {album}', trying normalized fallback" ) norm_q = f"{normalize_name(artist)} {normalize_name(album)}" def _search_norm(): try: if hasattr(mod, "search_album"): return mod.search_album(norm_q) if hasattr(mod, "Album") and hasattr(mod.Album, "search"): return mod.Album.search(norm_q) if hasattr(mod, "search"): return mod.search(norm_q, entity="album") return None except Exception: return None albums = await asyncio.to_thread(_search_norm) if not albums: return None first = albums[0] art_url = getattr(first, "artwork_url", None) or getattr( first, "artworkUrl100", None ) if not art_url: return None # Normalize to higher-res if possible if "100x100" in art_url: art_url = art_url.replace("100x100bb", "600x600bb") await limiter.acquire() img_timeout = aiohttp.ClientTimeout(total=15) try: async with session.get(art_url, timeout=img_timeout) as img_resp: if img_resp.status == 200: return await img_resp.read() except Exception: return None except Exception as e: console.print(f"[red][ERROR] itunespy album search exception: {e}[/red]") return None async def search_itunes_track( session: aiohttp.ClientSession, artist, title, limiter: "AsyncRateLimiter" ): # Use only the `itunespy` library for iTunes track lookups. if not HAVE_ITUNESPY: console.print( f"[yellow]iTunes: itunespy not available; skipping iTunes track search for '{artist} - {title}'[/yellow]" ) return None try: mod = cast(Any, itunespy) def _search(): try: if hasattr(mod, "search_track"): return mod.search_track(f"{artist} {title}") if hasattr(mod, "Track") and hasattr(mod.Track, "search"): return mod.Track.search(f"{artist} {title}") if hasattr(mod, "search"): return mod.search(f"{artist} {title}", entity="song") return None except Exception: return None tracks = await asyncio.to_thread(_search) if not tracks: if COVER_DEBUG_QUERIES: console.print( f"[cyan][DEBUG] iTunes track: no results for '{artist} - {title}', trying normalized fallback" ) norm_q = f"{normalize_name(artist)} {normalize_name(title)}" def _search_norm_track(): try: if hasattr(mod, "search_track"): return mod.search_track(norm_q) if hasattr(mod, "Track") and hasattr(mod.Track, "search"): return mod.Track.search(norm_q) if hasattr(mod, "search"): return mod.search(norm_q, entity="song") return None except Exception: return None tracks = await asyncio.to_thread(_search_norm_track) if not tracks: return None first = tracks[0] art_url = getattr(first, "artwork_url", None) or getattr( first, "artworkUrl100", None ) if not art_url: return None if "100x100" in art_url: art_url = art_url.replace("100x100bb", "600x600bb") await limiter.acquire() img_timeout = aiohttp.ClientTimeout(total=15) try: async with session.get(art_url, timeout=img_timeout) as img_resp: if img_resp.status == 200: return await img_resp.read() except Exception: return None except Exception as e: console.print(f"[red][ERROR] itunespy track search exception: {e}[/red]") return None async def search_deezer_cover( session: aiohttp.ClientSession, artist, album, limiter: "AsyncRateLimiter" ): """Search Deezer for an album cover. Uses Deezer public API (no auth).""" try: # build simple query from urllib.parse import quote query = f"{artist} {album}" if COVER_DEBUG_QUERIES: console.print(f"[cyan][DEBUG] Deezer query: {query}") url = f"https://api.deezer.com/search/album?q={quote(query)}&limit=1" await limiter.acquire() timeout = aiohttp.ClientTimeout(total=10) async with session.get(url, timeout=timeout) as resp: if resp.status != 200: return None data = await resp.json() items = data.get("data") or [] if COVER_DEBUG_QUERIES: try: dbg = [] for it in (items or [])[:3]: dbg.append( { "id": it.get("id"), "title": it.get("title"), "cover_xl": it.get("cover_xl"), "cover_big": it.get("cover_big"), } ) console.print(f"[cyan][DEBUG] Deezer candidates: {dbg}[/cyan]") except Exception: pass if not items: # try a conservative normalized fallback norm_q = f"{normalize_name(artist)} {normalize_name(album)}" if COVER_DEBUG_QUERIES: console.print(f"[cyan][DEBUG] Deezer fallback query: {norm_q}") url2 = f"https://api.deezer.com/search/album?q={quote(norm_q)}&limit=1" async with session.get(url2, timeout=timeout) as resp2: if resp2.status != 200: return None data2 = await resp2.json() items = data2.get("data") or [] if not items: return None first = items[0] # prefer XL or big covers art_url = ( first.get("cover_xl") or first.get("cover_big") or first.get("cover") ) if not art_url: return None await limiter.acquire() img_timeout = aiohttp.ClientTimeout(total=15) async with session.get(art_url, timeout=img_timeout) as img_resp: if img_resp.status == 200: return await img_resp.read() except Exception: return None return None async def search_lastfm_cover( session: aiohttp.ClientSession, artist, album, limiter: "AsyncRateLimiter" ): """Search Last.fm for album cover using album.getInfo. Requires LASTFM_API_KEY in env.""" LASTFM_API_KEY = os.getenv("LASTFM_API_KEY") if not LASTFM_API_KEY: console.print( f"[yellow]LastFM: LASTFM_API_KEY not configured; skipping LastFM search for '{artist} - {album}'[/yellow]" ) return None try: params = { "method": "album.getinfo", "api_key": LASTFM_API_KEY, "artist": artist, "album": album, "format": "json", } from urllib.parse import quote qs = "&".join(f"{quote(str(k))}={quote(str(v))}" for k, v in params.items()) url = f"http://ws.audioscrobbler.com/2.0/?{qs}" await limiter.acquire() timeout = aiohttp.ClientTimeout(total=10) async with session.get(url, timeout=timeout) as resp: if resp.status != 200: return None data = await resp.json() album_data = data.get("album") or {} images = album_data.get("image") or [] # images is a list of dicts with '#text' and 'size' art_url = None # prefer 'extralarge' or 'mega' for size_name in ("mega", "extralarge", "large", "medium"): for img in images: if img.get("size") == size_name and img.get("#text"): art_url = img.get("#text") break if art_url: break if not art_url: return None await limiter.acquire() img_timeout = aiohttp.ClientTimeout(total=15) async with session.get(art_url, timeout=img_timeout) as img_resp: if img_resp.status == 200: return await img_resp.read() except Exception: return None return None _SPOTIFY_CLIENT = None def get_spotify_client(): """Lazily create and cache a spotipy.Spotify client using client-credentials. Returns None if spotipy is not installed or credentials are not configured. """ global _SPOTIFY_CLIENT if _SPOTIFY_CLIENT is not None: return _SPOTIFY_CLIENT if not HAVE_SPOTIPY: return None client_id = os.getenv("SPOTIFY_CLIENT_ID") client_secret = os.getenv("SPOTIFY_CLIENT_SECRET") if not client_id or not client_secret: return None try: import importlib sp_mod = importlib.import_module("spotipy") creds_mod = importlib.import_module("spotipy.oauth2") SpotifyClientCredentials = getattr(creds_mod, "SpotifyClientCredentials", None) SpotifyCls = getattr(sp_mod, "Spotify", None) if SpotifyClientCredentials is None or SpotifyCls is None: return None creds = SpotifyClientCredentials( client_id=client_id, client_secret=client_secret ) _SPOTIFY_CLIENT = SpotifyCls(client_credentials_manager=creds) return _SPOTIFY_CLIENT except Exception: return None async def search_spotify_cover( session: aiohttp.ClientSession, artist, album, limiter: "AsyncRateLimiter", isrc: Optional[str] = None, ): """Search Spotify for album cover with multiple strategies: - If `isrc` provided, try track search by ISRC first. - Try quoted album+artist queries, then looser queries. - Use fuzzy matching to validate results. - Pick the largest available image. """ client = get_spotify_client() if client is None: console.print( f"[yellow]Spotify: client not configured or spotipy not installed; skipping search for '{artist} - {album}'[/yellow]" ) return None def _sp_search(q, typ="album", limit=3): try: return client.search(q=q, type=typ, limit=limit) except Exception: return None try: # 1) ISRC search (track -> album) if isrc: res = await asyncio.to_thread(_sp_search, f"isrc:{isrc}", "track", 1) if res: tracks = res.get("tracks", {}).get("items", []) if tracks: album_obj = tracks[0].get("album") or {} images = album_obj.get("images") or [] if images: # pick largest best = max(images, key=lambda x: x.get("width") or 0) art_url = best.get("url") if art_url: await limiter.acquire() async with session.get( art_url, timeout=aiohttp.ClientTimeout(total=15) ) as img_resp: if img_resp.status == 200: return await img_resp.read() # Prepare normalized variants for querying quoted_q = f'album:"{album}" artist:"{artist}"' exact_q = f"artist:{artist} album:{album}" norm_artist = normalize_name(artist) norm_album = normalize_name(album) simple_q = f"album:{norm_album} artist:{norm_artist}" queries = [ quoted_q, exact_q, simple_q, f'album:"{album}"', f'artist:"{artist}"', ] for q in queries: res = await asyncio.to_thread(_sp_search, q, "album", 3) if not res: continue albums = res.get("albums", {}).get("items", []) if COVER_DEBUG_QUERIES: try: dbg = [] for a in (albums or [])[:3]: dbg.append( { "id": a.get("id"), "name": a.get("name"), "artists": [ ar.get("name") for ar in (a.get("artists") or [])[:3] if ar.get("name") ], "images": [ img.get("url") for img in (a.get("images") or [])[:3] ], } ) console.print( f"[cyan][DEBUG] Spotify album candidates for query '{q}': {dbg}[/cyan]" ) except Exception: pass if not albums: continue # examine candidates and pick the best match via fuzzy matching for a in albums: found_album = a.get("name") or "" found_artist = " ".join( [ ar.get("name") for ar in (a.get("artists") or []) if ar.get("name") ] ) if is_fuzzy_match(artist, found_artist, threshold=75) and ( not album or is_fuzzy_match(album, found_album, threshold=70) ): images = a.get("images") or [] if not images: continue best = max(images, key=lambda x: x.get("width") or 0) art_url = best.get("url") if art_url: await limiter.acquire() try: async with session.get( art_url, timeout=aiohttp.ClientTimeout(total=15) ) as img_resp: if img_resp.status == 200: return await img_resp.read() except Exception: continue return None except Exception: return None # Fuzzy match helper for metadata def is_fuzzy_match(expected, actual, threshold=80): if not expected or not actual: return False return fuzz.token_set_ratio(expected.lower(), actual.lower()) >= threshold # Fuzzy match for all fields def is_metadata_match( expected_artist, expected_album, expected_title, found_artist, found_album, found_title, threshold=80, ): artist_match = is_fuzzy_match(expected_artist, found_artist, threshold) album_match = ( is_fuzzy_match(expected_album, found_album, threshold) if expected_album else True ) title_match = is_fuzzy_match(expected_title, found_title, threshold) return artist_match and album_match and title_match # Utility to normalize artist/song names for searching def normalize_name(name): # Lowercase, strip, remove extra spaces, and remove common punctuation name = name.lower().strip() name = re.sub(r"\([0-9]\)$", "", name) # remove (1), (2), etc. at end name = re.sub(r"[\s_]+", " ", name) name = re.sub(r"[\(\)\[\]\{\}\'\"\!\?\.,:;`~@#$%^&*+=|\\/<>]", "", name) return name def artist_credit_to_name(ac): """Safely convert a MusicBrainz artist-credit array into a single artist name string.""" parts = [] for a in ac: if isinstance(a, dict): # Common formats: {'name': 'Artist Name'} or {'artist': {'name': 'Artist Name'}} name = None if a.get("name"): name = a.get("name") elif ( a.get("artist") and isinstance(a.get("artist"), dict) and a.get("artist", {}).get("name") ): name = a.get("artist", {}).get("name") if name: parts.append(name) return " ".join(parts) # Suppress noisy loggers (aiohttp, urllib3, etc.) for noisy_logger in [ "aiohttp.client", "aiohttp.server", "aiohttp.access", "urllib3", "asyncio", "chardet", "requests.packages.urllib3", ]: logging.getLogger(noisy_logger).setLevel(logging.CRITICAL) logging.getLogger(noisy_logger).propagate = False # Also suppress root logger to CRITICAL for anything not our own logging.getLogger().setLevel(logging.CRITICAL) async def fetch_srutil_cover( sr, artist, song, session: aiohttp.ClientSession, limiter: AsyncRateLimiter ): try: album = await sr.get_album_by_name(artist, song) if not album or not album.get("id"): return None cover_url = await sr.get_cover_by_album_id(album["id"], 640) if cover_url: await limiter.acquire() try: timeout = aiohttp.ClientTimeout(total=15) async with session.get(cover_url, timeout=timeout) as resp: if resp.status == 200: return await resp.read() else: console.print( f"[red]SRUtil: Failed to fetch cover art from URL (status {resp.status}): {cover_url}[/red]" ) except Exception as e: console.print(f"[red]SRUtil: Exception fetching cover url: {e}[/red]") except Exception as e: msg = str(e) if "Cannot combine AUTHORIZATION header with AUTH argument" in msg: console.print( "[red]SRUtil: Skipping due to conflicting authentication method in dependency (AUTHORIZATION header + AUTH argument).[/red]" ) else: console.print(f"[red]SRUtil: Exception: {e}[/red]") return None async def get_isrc(file): try: def _read_isrc(): f = load_file(file) # music_tag may store ISRC under 'isrc' or 'ISRC' try: val = f["isrc"].value except Exception: try: val = f["ISRC"].value except Exception: val = None if isinstance(val, list): return val[0] if val else None return val return await asyncio.to_thread(_read_isrc) except Exception as e: console.print(f"[red]Error reading ISRC for {file}: {e}[/red]") return None async def search_musicbrainz_by_isrc(session, isrc, limiter: AsyncRateLimiter): if not isrc: return None headers = {"User-Agent": "cover-art-script/1.0"} # Use the ISRC lookup endpoint which returns recordings url = f"https://musicbrainz.org/ws/2/isrc/{isrc}?fmt=json" try: await limiter.acquire() timeout = aiohttp.ClientTimeout(total=15) async with session.get(url, headers=headers, timeout=timeout) as resp: if resp.status != 200: return None try: data = await resp.json() except Exception: return None recordings = data.get("recordings") or [] for rec in recordings: # try releases tied to this recording releases = rec.get("releases") or [] if releases: relid = releases[0].get("id") if relid: caa_url = ( f"https://coverartarchive.org/release/{relid}/front-500" ) async with session.get(caa_url, timeout=timeout) as caa_resp: if caa_resp.status == 200: console.print( f"[green]Found cover art via ISRC {isrc}[/green]" ) return await caa_resp.read() return None except Exception as e: console.print(f"[red]MusicBrainz ISRC lookup exception for {isrc}: {e}[/red]") return None # Concurrency limit for async processing CONCURRENCY = 18 # Helper for formatting failure reasons in a consistent way def format_failure_reason(e, resp_status=None): """Format a failure reason from an exception or response status""" if isinstance(e, asyncio.TimeoutError): return "timeout" elif isinstance(e, aiohttp.ClientError): return f"network error: {str(e)}" elif resp_status: return f"HTTP {resp_status}" elif e: return str(e) return "no match found" async def process_file( file, sr, table, results, sem, session: aiohttp.ClientSession, limiters: dict ): """Process a single audio file to find and embed cover art.""" async with sem: if await has_cover(file): table.add_row(file, "Already Present", "-") results.append([file, "Already Present", "-"]) return artist, album, title = await get_artist_album_title(file) album_key = (artist, album) image_bytes = ALBUM_ART_CACHE.get(album_key) source = None status = "Failed" # Try ISRC-based lookup first isrc = await get_isrc(file) if isrc: img = await search_musicbrainz_by_isrc( session, isrc, limiters["musicbrainz"] ) if img: image_bytes = img source = f"MusicBrainz (ISRC:{isrc})" status = "Success" ALBUM_ART_CACHE[album_key] = image_bytes _log_attempt(artist, album, title, source, "Success") else: _log_attempt( artist, album, title, f"MusicBrainz (ISRC:{isrc})", "No match" ) # If ONLY_SPOTIFY testing mode is enabled, attempt only Spotify and return if ONLY_SPOTIFY: img = await search_spotify_cover( session, artist, album, limiters["spotify"], isrc ) if img: image_bytes = img source = "Spotify" status = "Success" ALBUM_ART_CACHE[album_key] = image_bytes _log_attempt(artist, album, title, "Spotify", "Success") file_basename = os.path.basename(file) ok = await embed_cover(file, image_bytes) if ok: console.print( f"[green][FINAL RESULT] {file_basename} — {artist} / {album} | Success via {source}[/green]" ) else: status = "Embed Failed" console.print( f"[red][FINAL RESULT] {file_basename} — {artist} / {album} | Embed Failed from {source}[/red]" ) else: _log_attempt(artist, album, title, "Spotify", "No match") console.print( f"[yellow][FINAL RESULT] {os.path.basename(file)} — {artist} / {album} | No Spotify cover art found[/yellow]" ) table.add_row(file, status, source if source else "-") results.append([file, status, source if source else "-"]) return # SRUtil if not image_bytes: img = await fetch_srutil_cover( sr, artist, album, session, limiters["srutil"] ) if img: image_bytes = img source = "SRUtil" status = "Success" ALBUM_ART_CACHE[album_key] = image_bytes _log_attempt(artist, album, title, "SRUtil", "Success") else: _log_attempt(artist, album, title, "SRUtil", "No match") # MusicBrainz if not image_bytes: img = await search_musicbrainz_cover( artist, album, session, limiters["musicbrainz"] ) if img: image_bytes = img source = "MusicBrainz" status = "Success" ALBUM_ART_CACHE[album_key] = image_bytes _log_attempt(artist, album, title, "MusicBrainz", "Success") else: _log_attempt(artist, album, title, "MusicBrainz", "No match") # Discogs if not image_bytes: img = await search_discogs_cover( artist, album, session, limiters["discogs"] ) if img: image_bytes = img source = "Discogs" status = "Success" ALBUM_ART_CACHE[album_key] = image_bytes _log_attempt(artist, album, title, "Discogs", "Success") else: _log_attempt(artist, album, title, "Discogs", "No match") # Deezer if not image_bytes: img = await search_deezer_cover(session, artist, album, limiters["deezer"]) if img: image_bytes = img source = "Deezer" status = "Success" ALBUM_ART_CACHE[album_key] = image_bytes _log_attempt(artist, album, title, "Deezer", "Success") else: _log_attempt(artist, album, title, "Deezer", "No match") # Spotify if not image_bytes: img = await search_spotify_cover( session, artist, album, limiters["spotify"], isrc ) if img: image_bytes = img source = "Spotify" status = "Success" ALBUM_ART_CACHE[album_key] = image_bytes _log_attempt(artist, album, title, "Spotify", "Success") else: _log_attempt(artist, album, title, "Spotify", "No match") # iTunes album if not image_bytes: img = await search_itunes_cover(session, artist, album, limiters["itunes"]) if img: image_bytes = img source = "iTunes(album)" status = "Success" ALBUM_ART_CACHE[album_key] = image_bytes _log_attempt(artist, album, title, "iTunes(album)", "Success") else: _log_attempt(artist, album, title, "iTunes(album)", "No match") # iTunes track if not image_bytes: img = await search_itunes_track(session, artist, title, limiters["itunes"]) if img: image_bytes = img source = "iTunes(track)" status = "Success" ALBUM_ART_CACHE[album_key] = image_bytes _log_attempt(artist, album, title, "iTunes(track)", "Success") else: _log_attempt(artist, album, title, "iTunes(track)", "No match") # Last.fm if not image_bytes: img = await search_lastfm_cover(session, artist, album, limiters["lastfm"]) if img: image_bytes = img source = "LastFM" status = "Success" ALBUM_ART_CACHE[album_key] = image_bytes _log_attempt(artist, album, title, "LastFM", "Success") else: _log_attempt(artist, album, title, "LastFM", "No match") # Embed and summary file_basename = os.path.basename(file) if image_bytes and source: ok = await embed_cover(file, image_bytes) if ok: console.print( f"[green][FINAL RESULT] {file_basename} — {artist} / {album} | Success via {source}[/green]" ) else: status = "Embed Failed" console.print( f"[red][FINAL RESULT] {file_basename} — {artist} / {album} | Embed Failed from {source}[/red]" ) else: console.print( f"[yellow][FINAL RESULT] {file_basename} — {artist} / {album} | No cover art found[/yellow]" ) table.add_row(file, status, source if source else "-") results.append([file, status, source if source else "-"]) async def has_cover(file): # Check if the audio file already has embedded cover art try: f = load_file(file) # music_tag stores artwork in 'artwork' which may be a list-like field art = f["artwork"] # If there is any artwork, consider it present try: return bool(art.first) except Exception: # fallback if .first not available return bool(art) except Exception: return False async def get_artist_album_title(file): # Extract artist, album, and title from audio file tags try: f = load_file(file) artist = str(f["artist"].first) if f["artist"].first else "Unknown Artist" album = str(f["album"].first) if f["album"].first else "Unknown Album" title = str(f["title"].first) if f["title"].first else "Unknown Title" return artist, album, title except Exception: return "Unknown Artist", "Unknown Album", "Unknown Title" async def embed_cover(file, image_bytes): # Embed cover art into audio file metadata using music_tag try: f = load_file(file) f["artwork"] = image_bytes f.save() return True except Exception as e: console.print(f"[red][ERROR] Failed to embed cover: {e}[/red]") return False async def main(): try: console.print(f"[bold blue]Scanning directory: {MUSIC_DIR}[/bold blue]") sr = SRUtil() results = [] files = [] for root, _, filenames in os.walk(MUSIC_DIR): for fn in filenames: if os.path.splitext(fn)[1].lower() in AUDIO_EXTS: file_path = os.path.join(root, fn) files.append(file_path) table = Table(title="Cover Art Embedding Report") table.add_column("File", style="cyan", overflow="fold") table.add_column("Status", style="green") table.add_column("Source", style="magenta") # create rate limiters (seconds between requests) RATE_SRUTIL = 0.1 RATE_MUSICBRAINZ = 1.0 RATE_ITUNES = 0.5 RATE_DISCOGS = 1.0 RATE_DEEZER = 0.5 RATE_LASTFM = 1.0 RATE_SPOTIFY = 0.5 limiters = { "srutil": AsyncRateLimiter(RATE_SRUTIL), "musicbrainz": AsyncRateLimiter(RATE_MUSICBRAINZ), "itunes": AsyncRateLimiter(RATE_ITUNES), "discogs": AsyncRateLimiter(RATE_DISCOGS), "deezer": AsyncRateLimiter(RATE_DEEZER), "lastfm": AsyncRateLimiter(RATE_LASTFM), "spotify": AsyncRateLimiter(RATE_SPOTIFY), } sem = asyncio.Semaphore(CONCURRENCY) def format_failure_reason(e, resp_status=None): """Format a failure reason from an exception or response status""" if isinstance(e, asyncio.TimeoutError): return "timeout" elif isinstance(e, aiohttp.ClientError): return f"network error: {str(e)}" elif resp_status: return f"HTTP {resp_status}" elif e: return str(e) return "no match found" async def worker( file, sr, table, results, sem, progress, task_id, session, limiters ): await process_file(file, sr, table, results, sem, session, limiters) progress.update(task_id, advance=1) async with aiohttp.ClientSession() as session: with Progress( TextColumn("[progress.description]{task.description}"), BarColumn(), TaskProgressColumn(), TimeElapsedColumn(), ) as progress: task_id = progress.add_task("Processing files...", total=len(files)) # Schedule all workers await asyncio.gather( *( worker( file, sr, table, results, sem, progress, task_id, session, limiters, ) for file in files ) ) # Print summary table and CSV after progress bar console.print(table) with open(REPORT_CSV, "w", newline="") as f: writer = csv.writer(f) writer.writerow(["File", "Status", "Source"]) writer.writerows(results) console.print(f"[bold green]CSV report written to {REPORT_CSV}[/bold green]") except Exception as e: console.print(f"[red][ERROR] Unhandled exception: {e}[/red]") traceback.print_exc() sys.exit(1) if __name__ == "__main__": asyncio.run(main())