import os import csv import re import time import sys import random import asyncio import logging import traceback import requests from music_tag import load_file from rich.console import Console from rich.table import Table from rich.progress import Progress, BarColumn, TextColumn, TimeElapsedColumn, TaskProgressColumn sys.path.insert(0, "..") from utils.sr_wrapper import SRUtil from rapidfuzz import fuzz # Helper to strip common parenthetical tags from album names def strip_album_tags(album): """Remove common parenthetical tags from the end of album names.""" pattern = r"\s*\((deluxe|remaster(ed)?|original mix|expanded|bonus|edition|version|mono|stereo|explicit|clean|anniversary|special|reissue|expanded edition|bonus track(s)?|international|digital|single|ep|live|instrumental|karaoke|radio edit|explicit version|clean version|acoustic|demo|re-recorded|remix|mix|edit|feat\.?|featuring|with .+|from .+|soundtrack|ost|score|session|vol(ume)? ?\d+|disc ?\d+|cd ?\d+|lp ?\d+|vinyl|202[0-9]|20[0-1][0-9]|19[0-9][0-9])\)$" return re.sub(pattern, "", album, flags=re.IGNORECASE).strip() # Helper to strip common trailing tags like EP, LP, Single, Album, etc. from album names def strip_album_suffix(album): # Remove trailing tags like ' EP', ' LP', ' Single', ' Album', ' Remix', ' Version', etc. # Only if they appear at the end, case-insensitive, with or without punctuation suffix_pattern = r"[\s\-_:]*(ep|lp|single|album|remix|version|edit|mix|deluxe|expanded|anniversary|reissue|instrumental|karaoke|ost|score|session|mono|stereo|explicit|clean|bonus|disc ?\d+|cd ?\d+|vinyl|202[0-9]|20[0-1][0-9]|19[0-9][0-9])$" return re.sub(suffix_pattern, "", album, flags=re.IGNORECASE).strip() # iTunes/Apple Music API fallback def search_itunes_cover(artist, album): """Search iTunes/Apple Music public API for album art.""" import urllib.parse base_url = "https://itunes.apple.com/search" params = { "term": f"{artist} {album}", "entity": "album", "limit": 1, "media": "music" } url = f"{base_url}?{urllib.parse.urlencode(params)}" try: resp = requests.get(url, timeout=10) if resp.status_code != 200: return None data = resp.json() if data.get("resultCount", 0) == 0: return None result = data["results"][0] # Use the highest-res artwork available art_url = result.get("artworkUrl100") if art_url: art_url = art_url.replace("100x100bb", "600x600bb") img_resp = requests.get(art_url) if img_resp.status_code == 200: return img_resp.content except Exception: traceback.format_exc() pass return None # Fuzzy match helper for metadata def is_fuzzy_match(expected, actual, threshold=80): if not expected or not actual: return False return fuzz.token_set_ratio(expected.lower(), actual.lower()) >= threshold # Fuzzy match for all fields def is_metadata_match(expected_artist, expected_album, expected_title, found_artist, found_album, found_title, threshold=80): artist_match = is_fuzzy_match(expected_artist, found_artist, threshold) album_match = is_fuzzy_match(expected_album, found_album, threshold) if expected_album else True title_match = is_fuzzy_match(expected_title, found_title, threshold) return artist_match and album_match and title_match # Utility to normalize artist/song names for searching def normalize_name(name): # Lowercase, strip, remove extra spaces, and remove common punctuation name = name.lower().strip() name = re.sub(r"\([0-9]\)$", "", name) # remove (1), (2), etc. at end name = re.sub(r"[\s_]+", " ", name) name = re.sub(r"[\(\)\[\]\{\}\'\"\!\?\.,:;`~@#$%^&*+=|\\/<>]", "", name) return name # Suppress noisy loggers (aiohttp, urllib3, etc.) for noisy_logger in [ "aiohttp.client", "aiohttp.server", "aiohttp.access", "urllib3", "asyncio", "chardet", "requests.packages.urllib3", ]: logging.getLogger(noisy_logger).setLevel(logging.CRITICAL) logging.getLogger(noisy_logger).propagate = False # Also suppress root logger to CRITICAL for anything not our own logging.getLogger().setLevel(logging.CRITICAL) # Directory to scan MUSIC_DIR = "/storage/music2/completed/FLAC/review" REPORT_CSV = "cover_art_report.csv" AUDIO_EXTS = {".flac", ".mp3", ".m4a"} console = Console() # MusicBrainz API helpers # Limit concurrent MusicBrainz requests MUSICBRAINZ_SEMAPHORE = asyncio.Semaphore(1) def search_musicbrainz_cover(artist, album, max_retries=4): url = f"https://musicbrainz.org/ws/2/release-group/?query=artist:{artist} AND release:{album}&fmt=json" headers = {"User-Agent": "cover-art-script/1.0"} delay = 1.5 for attempt in range(1, max_retries + 1): # Limit concurrency loop = asyncio.get_event_loop() if MUSICBRAINZ_SEMAPHORE.locked(): loop.run_until_complete(MUSICBRAINZ_SEMAPHORE.acquire()) else: MUSICBRAINZ_SEMAPHORE.acquire() try: resp = requests.get(url, headers=headers) if resp.status_code == 503: console.print(f"[yellow]MusicBrainz 503 error, retrying (attempt {attempt})...[/yellow]") time.sleep(delay + random.uniform(0, 0.5)) delay *= 2 continue if resp.status_code != 200: console.print(f"[red]MusicBrainz API error: {resp.status_code}[/red]") return None try: data = resp.json() except Exception as e: console.print(f"[red]MusicBrainz API returned invalid JSON for {artist} - {album}: {e}[/red]") return None if not data.get("release-groups"): console.print(f"[red]No release-groups found for {artist} - {album}[/red]") return None rgid = data["release-groups"][0]["id"] caa_url = f"https://coverartarchive.org/release-group/{rgid}/front-500" caa_resp = requests.get(caa_url) if caa_resp.status_code == 200: console.print(f"[green]Found cover art on Cover Art Archive for {artist} - {album}[/green]") return caa_resp.content console.print(f"[red]No cover art found on Cover Art Archive for {artist} - {album}[/red]") return None finally: try: MUSICBRAINZ_SEMAPHORE.release() except Exception: pass console.print(f"[red]MusicBrainz API failed after {max_retries} attempts for {artist} - {album}[/red]") return None async def fetch_srutil_cover(sr, artist, song): try: album = await sr.get_album_by_name(artist, song) if not album or not album.get('id'): return None cover_url = await sr.get_cover_by_album_id(album['id'], 640) if cover_url: resp = requests.get(cover_url) if resp.status_code == 200: return resp.content else: console.print(f"[red]SRUtil: Failed to fetch cover art from URL (status {resp.status_code}): {cover_url}[/red]") except Exception as e: msg = str(e) if "Cannot combine AUTHORIZATION header with AUTH argument" in msg: console.print("[red]SRUtil: Skipping due to conflicting authentication method in dependency (AUTHORIZATION header + AUTH argument).[/red]") else: console.print(f"[red]SRUtil: Exception: {e}[/red]") return None def has_cover(file): try: f = load_file(file) has = bool(f['artwork'].first) return has except Exception as e: console.print(f"[red]Error checking cover art for {file}: {e}[/red]") return False def embed_cover(file, image_bytes): try: f = load_file(file) f['artwork'] = image_bytes f.save() return True except Exception as e: console.print(f"[red]Failed to embed cover art into {file}: {e}[/red]") return False def get_artist_album_title(file): try: f = load_file(file) artist = f['artist'].value or "" album = f['album'].value or "" title = f['title'].value or os.path.splitext(os.path.basename(file))[0] return artist, album, title except Exception as e: console.print(f"[red]Error reading tags for {file}: {e}[/red]") return "", "", os.path.splitext(os.path.basename(file))[0] # Concurrency limit for async processing CONCURRENCY = 12 async def process_file(file, sr, table, results, sem): async with sem: if has_cover(file): table.add_row(file, "Already Present", "-") results.append([file, "Already Present", "-"]) return artist, album, title = get_artist_album_title(file) # Use a global or passed-in cache dict for album art if not hasattr(process_file, "album_art_cache"): process_file.album_art_cache = {} album_key = (artist, album) image_bytes = process_file.album_art_cache.get(album_key) source = "SRUtil" if image_bytes is None: image_bytes = await fetch_srutil_cover(sr, artist, album) if image_bytes: process_file.album_art_cache[album_key] = image_bytes if not image_bytes: image_bytes = search_musicbrainz_cover(normalize_name(artist), normalize_name(album)) source = "MusicBrainz" if image_bytes: process_file.album_art_cache[album_key] = image_bytes if not image_bytes: image_bytes = search_itunes_cover(artist, album) source = "iTunes" if image_bytes: process_file.album_art_cache[album_key] = image_bytes # If all lookups failed, try with parenthetical tag stripped if not image_bytes and re.search(r"\([^)]*\)$", album): cleaned_album = strip_album_tags(album) if cleaned_album and cleaned_album != album: cleaned_key = (artist, cleaned_album) image_bytes = process_file.album_art_cache.get(cleaned_key) if image_bytes is None: image_bytes = await fetch_srutil_cover(sr, artist, cleaned_album) if image_bytes: process_file.album_art_cache[cleaned_key] = image_bytes if not image_bytes: image_bytes = search_musicbrainz_cover(normalize_name(artist), normalize_name(cleaned_album)) source = "MusicBrainz (stripped)" if image_bytes: process_file.album_art_cache[cleaned_key] = image_bytes if not image_bytes: image_bytes = search_itunes_cover(artist, cleaned_album) source = "iTunes (stripped)" if image_bytes: process_file.album_art_cache[cleaned_key] = image_bytes # If still not found, try with common suffixes (EP, LP, etc.) stripped from album name if not image_bytes: suffix_stripped_album = strip_album_suffix(album) if suffix_stripped_album and suffix_stripped_album != album: suffix_key = (artist, suffix_stripped_album) image_bytes = process_file.album_art_cache.get(suffix_key) if image_bytes is None: image_bytes = await fetch_srutil_cover(sr, artist, suffix_stripped_album) if image_bytes: process_file.album_art_cache[suffix_key] = image_bytes if not image_bytes: image_bytes = search_musicbrainz_cover(normalize_name(artist), normalize_name(suffix_stripped_album)) source = "MusicBrainz (suffix-stripped)" if image_bytes: process_file.album_art_cache[suffix_key] = image_bytes if not image_bytes: image_bytes = search_itunes_cover(artist, suffix_stripped_album) source = "iTunes (suffix-stripped)" if image_bytes: process_file.album_art_cache[suffix_key] = image_bytes if isinstance(image_bytes, bytes): ok = embed_cover(file, image_bytes) status = "Embedded" if ok else "Failed to Embed" if ok: console.print(f"[green]Embedded cover art from {source}:[/green] {file}") else: console.print(f"[red]Failed to embed cover art ({source}):[/red] {file}") elif image_bytes: status = "Failed to Embed (not bytes)" console.print(f"[red]Failed to embed cover art (not bytes) ({source}):[/red] {file}") else: status = "Not Found" source = "-" console.print(f"[red]No cover art found:[/red] {file}") table.add_row(file, status, source) results.append([file, status, source]) async def main(): console.print(f"[bold blue]Scanning directory: {MUSIC_DIR}[/bold blue]") sr = SRUtil() results = [] files = [] for root, _, filenames in os.walk(MUSIC_DIR): for fn in filenames: if os.path.splitext(fn)[1].lower() in AUDIO_EXTS: file_path = os.path.join(root, fn) files.append(file_path) table = Table(title="Cover Art Embedding Report") table.add_column("File", style="cyan", overflow="fold") table.add_column("Status", style="green") table.add_column("Source", style="magenta") sem = asyncio.Semaphore(CONCURRENCY) async def worker(file, sr, table, results, sem, progress, task_id): await process_file(file, sr, table, results, sem) progress.update(task_id, advance=1) with Progress( TextColumn("[progress.description]{task.description}"), BarColumn(), TaskProgressColumn(), TimeElapsedColumn(), ) as progress: task_id = progress.add_task("Processing files...", total=len(files)) # Schedule all workers await asyncio.gather(*(worker(file, sr, table, results, sem, progress, task_id) for file in files)) # Print summary table and CSV after progress bar console.print(table) with open(REPORT_CSV, "w", newline="") as f: writer = csv.writer(f) writer.writerow(["File", "Status", "Source"]) writer.writerows(results) console.print(f"[bold green]CSV report written to {REPORT_CSV}[/bold green]") if __name__ == "__main__": asyncio.run(main())