Files
api/test/add_cover_art.py
codey c2044711fb - Changed API key validation from if not _key in self.constants.API_KEYS to if _key not in self.constants.API_KEYS for better readability.
Enhance RadioUtil playlist handling and deduplication

- Added checks to ensure playlists are initialized and not empty.
- Improved deduplication logic to prevent modifying the original playlist during iteration.
- Added logging for duplicate removal and playlist population.

Add cover art handling in rip_background.py

- Implemented functionality to attach album art if provided in metadata.
- Added error handling for cover art download failures.

Introduce unique filename handling in rip_background.py

- Added `ensure_unique_filename_in_dir` function to prevent overwriting files with the same name.

Refactor SRUtil for improved error handling and metadata fetching

- Introduced `MetadataFetchError` for better error management during metadata retrieval.
- Implemented `_safe_api_call` for resilient API calls with retry logic.
- Enhanced `get_artists_by_name` to optionally group results by artist name.
- Updated various methods to utilize the new error handling and retry mechanisms.
2025-09-22 11:08:48 -04:00

1087 lines
45 KiB
Python

# Standard library
import os
import sys
import re
import csv
import asyncio
import logging
import traceback
# Third-party
import aiohttp
from pathlib import Path
from dotenv import load_dotenv
from rapidfuzz import fuzz
from music_tag import load_file # type: ignore
from rich.console import Console
from rich.table import Table
from rich.progress import Progress, BarColumn, TextColumn, TimeElapsedColumn, TaskProgressColumn
# Local imports
sys.path.insert(0, "..")
from utils.sr_wrapper import SRUtil
import musicbrainzngs # type: ignore
from discogs_client import Client # type: ignore
# typing helper
from typing import Any, cast, Optional
# Optional: use the popular `itunespy` PyPI package when available
try:
import itunespy # type: ignore
HAVE_ITUNESPY = True
except Exception:
itunespy = None
HAVE_ITUNESPY = False
# Optional: use `spotipy` when available for Spotify lookups
try:
import spotipy # type: ignore
HAVE_SPOTIPY = True
except Exception:
spotipy = None
HAVE_SPOTIPY = False
# Reminder: If you see 'Import "music_tag" could not be resolved', run:
# uv add music-tag
# Configurable paths and extensions
MUSIC_DIR = Path("/storage/music2/completed/FLAC/review")
AUDIO_EXTS = {'.flac', '.mp3', '.m4a', '.ogg', '.wav', '.aac'}
REPORT_CSV = "cover_art_report.csv"
ALBUM_ART_CACHE: dict = {}
# Reminder: If you see 'Import "music_tag" could not be resolved', run:
# uv add music-tag
async def search_musicbrainz_cover(artist, album, session: aiohttp.ClientSession, limiter: 'AsyncRateLimiter'):
# Use musicbrainzngs to search for a release-group matching artist+album
try:
# search for release-groups using a thread to avoid blocking
query = f"artist:{artist} AND release:{album}"
try:
res = await asyncio.to_thread(musicbrainzngs.search_release_groups, query, 5)
except Exception:
res = {}
if COVER_DEBUG_QUERIES:
try:
rgs_dbg = res.get('release-group-list') or []
dbg_info = []
for rg in rgs_dbg[:3]:
dbg_info.append({
'id': rg.get('id'),
'title': rg.get('title'),
'artist': artist_credit_to_name(rg.get('artist-credit', []))
})
console.print(f"[cyan][DEBUG] MusicBrainz candidates: {dbg_info}[/cyan]")
except Exception:
pass
rgs = res.get('release-group-list') or []
if COVER_DEBUG_QUERIES:
try:
dbg_info = []
for rg in (rgs or [])[:3]:
dbg_info.append({
'id': rg.get('id'),
'title': rg.get('title'),
'artist': artist_credit_to_name(rg.get('artist-credit', []))
})
console.print(f"[cyan][DEBUG] MusicBrainz top candidates: {dbg_info}[/cyan]")
except Exception:
pass
for rg in rgs:
# try to get cover art via Cover Art Archive for releases in the group
# check releases for a cover
releases = rg.get('release-list') or []
for rel in releases:
relid = rel.get('id')
if relid:
caa_url = f"https://coverartarchive.org/release/{relid}/front-500"
try:
await limiter.acquire()
timeout = aiohttp.ClientTimeout(total=15)
async with session.get(caa_url, timeout=timeout) as resp:
if resp.status == 200:
return await resp.read()
except Exception:
continue
return None
except Exception as e:
console.print(f"[red]MusicBrainz search exception: {e}[/red]")
return None
async def search_discogs_cover(artist, album, session: aiohttp.ClientSession, limiter: 'AsyncRateLimiter'):
# Use discogs_client to search for releases matching artist+album
try:
if not DISCOGS_TOKEN:
return None
# Use the discogs client (synchronous) to search in a thread
try:
await limiter.acquire()
if COVER_DEBUG_QUERIES:
console.print(f"[cyan][DEBUG] Discogs query: album='{album}' artist='{artist}'")
results = await asyncio.to_thread(discogs_client.search, album, {'artist': artist, 'type': 'release'})
except Exception:
results = []
if COVER_DEBUG_QUERIES:
try:
dbg = []
for rr in (results or [])[:3]:
try:
data = getattr(rr, 'data', {}) or {}
dbg.append({
'id': data.get('id'),
'title': data.get('title') or getattr(rr, 'title', None),
'cover_image': data.get('cover_image')
})
except Exception:
continue
console.print(f"[cyan][DEBUG] Discogs candidates: {dbg}[/cyan]")
except Exception:
pass
if not results:
# conservative normalized fallback: try a combined normalized string
try:
await limiter.acquire()
combined = f"{normalize_name(artist)} {normalize_name(album)}"
if COVER_DEBUG_QUERIES:
console.print(f"[cyan][DEBUG] Discogs fallback query: {combined}")
results = await asyncio.to_thread(discogs_client.search, combined, {'type': 'release'})
except Exception:
results = []
for r in results:
# r.data may contain 'cover_image' or images
cover = None
try:
cover = r.data.get('cover_image')
except Exception:
cover = None
if not cover:
# try images list
imgs = r.data.get('images') or []
if imgs and isinstance(imgs, list) and imgs[0].get('uri'):
cover = imgs[0].get('uri')
if cover:
# fetch image via aiohttp
try:
await limiter.acquire()
timeout = aiohttp.ClientTimeout(total=15)
async with session.get(cover, timeout=timeout) as resp:
if resp.status == 200:
return await resp.read()
except Exception:
continue
return None
except Exception as e:
console.print(f"[red]Discogs search exception: {e}[/red]")
return None
# Load env once
load_dotenv()
# Console for pretty output
console = Console()
# If set to '1'|'true', run only Spotify searches (useful for quick testing)
ONLY_SPOTIFY = os.getenv('ONLY_SPOTIFY', '').lower() in ('1', 'true', 'yes')
# If set, print query strings and brief response info for debugging
COVER_DEBUG_QUERIES = os.getenv('COVER_DEBUG_QUERIES', '').lower() in ('1', 'true', 'yes')
# If set, use more aggressive fuzzy thresholds and extra fallbacks
COVER_AGGRESSIVE = os.getenv('COVER_AGGRESSIVE', '').lower() in ('1', 'true', 'yes')
def _log_attempt(artist, album, title, source, result):
"""Log a single, clean attempt line to console and to `search_attempts.log`.
result should be a short string like 'Success', 'No match', 'Timeout', or an error message.
"""
try:
a = artist or "Unknown Artist"
al = album or "Unknown Album"
t = title or "Unknown Title"
line = f"SEARCH: {a} - {al} / {t} | Source: {source} | Result: {result}"
console.print(line)
try:
with open("search_attempts.log", "a", encoding="utf-8") as lf:
lf.write(line + "\n")
except Exception:
pass
except Exception:
# Never crash logging
pass
# Define a lightweight async rate limiter
class AsyncRateLimiter:
def __init__(self, rate_seconds: float):
self._rate = float(rate_seconds)
self._lock = asyncio.Lock()
self._last = 0.0
async def acquire(self) -> None:
async with self._lock:
now = asyncio.get_event_loop().time()
wait = self._rate - (now - self._last)
if wait > 0:
await asyncio.sleep(wait)
self._last = asyncio.get_event_loop().time()
# Initialize MusicBrainz client
musicbrainzngs.set_useragent("cover-art-script", "1.0", "your-email@example.com")
# Initialize Discogs client
DISCOGS_TOKEN = os.getenv("DISCOGS_TOKEN")
discogs_client = Client("cover-art-script/1.0", user_token=DISCOGS_TOKEN)
# Define the log_api_response function at the top of the script
async def log_api_response(api_name, response):
"""Log relevant parts of API responses for debugging purposes."""
try:
data = await response.json()
if api_name == "MusicBrainz":
release_groups = data.get("release-groups", [])
relevant_info = [
{
"id": rg.get("id"),
"title": rg.get("title"),
"artist": artist_credit_to_name(rg.get("artist-credit", []))
}
for rg in release_groups
]
console.print(f"[cyan][DEBUG] {api_name} relevant response: {relevant_info}[/cyan]")
elif api_name == "Discogs":
results = data.get("results", [])
relevant_info = [
{
"id": result.get("id"),
"title": result.get("title"),
"cover_image": result.get("cover_image")
}
for result in results
]
console.print(f"[cyan][DEBUG] {api_name} relevant response: {relevant_info}[/cyan]")
elif api_name == "iTunes":
results = data.get("results", [])
relevant_info = [
{
"collectionId": result.get("collectionId"),
"collectionName": result.get("collectionName"),
"artworkUrl100": result.get("artworkUrl100")
}
for result in results
]
console.print(f"[cyan][DEBUG] {api_name} relevant response: {relevant_info}[/cyan]")
else:
console.print(f"[cyan][DEBUG] {api_name} response: {data}[/cyan]")
except Exception as e:
console.print(f"[red][DEBUG] Failed to parse {api_name} response: {e}[/red]")
# Helper to strip common parenthetical tags from album names
def strip_album_tags(album):
"""Remove common parenthetical tags from the end of album names."""
pattern = r"\s*\((deluxe|remaster(ed)?|original mix|expanded|bonus|edition|version|mono|stereo|explicit|clean|anniversary|special|reissue|expanded edition|bonus track(s)?|international|digital|single|ep|live|instrumental|karaoke|radio edit|explicit version|clean version|acoustic|demo|re-recorded|remix|mix|edit|feat\.?|featuring|with .+|from .+|soundtrack|ost|score|session|vol(ume)? ?\d+|disc ?\d+|cd ?\d+|lp ?\d+|vinyl|202[0-9]|20[0-1][0-9]|19[0-9][0-9])\)$"
return re.sub(pattern, "", album, flags=re.IGNORECASE).strip()
# Helper to strip common trailing tags like EP, LP, Single, Album, etc. from album names
def strip_album_suffix(album):
# Remove trailing tags like ' EP', ' LP', ' Single', ' Album', ' Remix', ' Version', etc.
# Only if they appear at the end, case-insensitive, with or without punctuation
suffix_pattern = r"[\s\-_:]*(ep|lp|single|album|remix|version|edit|mix|deluxe|expanded|anniversary|reissue|instrumental|karaoke|ost|score|session|mono|stereo|explicit|clean|bonus|disc ?\d+|cd ?\d+|vinyl|202[0-9]|20[0-1][0-9]|19[0-9][0-9])$"
return re.sub(suffix_pattern, "", album, flags=re.IGNORECASE).strip()
# iTunes/Apple Music API fallback (async)
async def search_itunes_cover(session: aiohttp.ClientSession, artist, album, limiter: 'AsyncRateLimiter'):
# Use only the `itunespy` library for iTunes album lookups.
if not HAVE_ITUNESPY:
console.print(f"[yellow]iTunes: itunespy not available; skipping iTunes album search for '{artist} - {album}'[/yellow]")
return None
try:
mod = cast(Any, itunespy)
def _search():
try:
# try common itunespy APIs safely
if hasattr(mod, 'search_album'):
return mod.search_album(f"{artist} {album}")
if hasattr(mod, 'Album') and hasattr(mod.Album, 'search'):
return mod.Album.search(f"{artist} {album}")
if hasattr(mod, 'search'):
return mod.search(f"{artist} {album}", entity='album')
return None
except Exception:
return None
albums = await asyncio.to_thread(_search)
if COVER_DEBUG_QUERIES and albums:
try:
dbg = []
for a in (albums or [])[:3]:
try:
aid = getattr(a, 'collectionId', None) or (a.get('collectionId') if isinstance(a, dict) else None)
except Exception:
aid = None
try:
aname = getattr(a, 'collectionName', None) or (a.get('collectionName') if isinstance(a, dict) else None)
except Exception:
aname = None
dbg.append({'id': aid, 'name': aname})
console.print(f"[cyan][DEBUG] iTunes album candidates: {dbg}[/cyan]")
except Exception:
pass
if not albums:
if COVER_DEBUG_QUERIES:
console.print(f"[cyan][DEBUG] iTunes album: no results for '{artist} - {album}', trying normalized fallback")
norm_q = f"{normalize_name(artist)} {normalize_name(album)}"
def _search_norm():
try:
if hasattr(mod, 'search_album'):
return mod.search_album(norm_q)
if hasattr(mod, 'Album') and hasattr(mod.Album, 'search'):
return mod.Album.search(norm_q)
if hasattr(mod, 'search'):
return mod.search(norm_q, entity='album')
return None
except Exception:
return None
albums = await asyncio.to_thread(_search_norm)
if not albums:
return None
first = albums[0]
art_url = getattr(first, 'artwork_url', None) or getattr(first, 'artworkUrl100', None)
if not art_url:
return None
# Normalize to higher-res if possible
if '100x100' in art_url:
art_url = art_url.replace('100x100bb', '600x600bb')
await limiter.acquire()
img_timeout = aiohttp.ClientTimeout(total=15)
try:
async with session.get(art_url, timeout=img_timeout) as img_resp:
if img_resp.status == 200:
return await img_resp.read()
except Exception:
return None
except Exception as e:
console.print(f"[red][ERROR] itunespy album search exception: {e}[/red]")
return None
async def search_itunes_track(session: aiohttp.ClientSession, artist, title, limiter: 'AsyncRateLimiter'):
# Use only the `itunespy` library for iTunes track lookups.
if not HAVE_ITUNESPY:
console.print(f"[yellow]iTunes: itunespy not available; skipping iTunes track search for '{artist} - {title}'[/yellow]")
return None
try:
mod = cast(Any, itunespy)
def _search():
try:
if hasattr(mod, 'search_track'):
return mod.search_track(f"{artist} {title}")
if hasattr(mod, 'Track') and hasattr(mod.Track, 'search'):
return mod.Track.search(f"{artist} {title}")
if hasattr(mod, 'search'):
return mod.search(f"{artist} {title}", entity='song')
return None
except Exception:
return None
tracks = await asyncio.to_thread(_search)
if not tracks:
if COVER_DEBUG_QUERIES:
console.print(f"[cyan][DEBUG] iTunes track: no results for '{artist} - {title}', trying normalized fallback")
norm_q = f"{normalize_name(artist)} {normalize_name(title)}"
def _search_norm_track():
try:
if hasattr(mod, 'search_track'):
return mod.search_track(norm_q)
if hasattr(mod, 'Track') and hasattr(mod.Track, 'search'):
return mod.Track.search(norm_q)
if hasattr(mod, 'search'):
return mod.search(norm_q, entity='song')
return None
except Exception:
return None
tracks = await asyncio.to_thread(_search_norm_track)
if not tracks:
return None
first = tracks[0]
art_url = getattr(first, 'artwork_url', None) or getattr(first, 'artworkUrl100', None)
if not art_url:
return None
if '100x100' in art_url:
art_url = art_url.replace('100x100bb', '600x600bb')
await limiter.acquire()
img_timeout = aiohttp.ClientTimeout(total=15)
try:
async with session.get(art_url, timeout=img_timeout) as img_resp:
if img_resp.status == 200:
return await img_resp.read()
except Exception:
return None
except Exception as e:
console.print(f"[red][ERROR] itunespy track search exception: {e}[/red]")
return None
async def search_deezer_cover(session: aiohttp.ClientSession, artist, album, limiter: 'AsyncRateLimiter'):
"""Search Deezer for an album cover. Uses Deezer public API (no auth)."""
try:
# build simple query
from urllib.parse import quote
query = f"{artist} {album}"
if COVER_DEBUG_QUERIES:
console.print(f"[cyan][DEBUG] Deezer query: {query}")
url = f"https://api.deezer.com/search/album?q={quote(query)}&limit=1"
await limiter.acquire()
timeout = aiohttp.ClientTimeout(total=10)
async with session.get(url, timeout=timeout) as resp:
if resp.status != 200:
return None
data = await resp.json()
items = data.get('data') or []
if COVER_DEBUG_QUERIES:
try:
dbg = []
for it in (items or [])[:3]:
dbg.append({
'id': it.get('id'),
'title': it.get('title'),
'cover_xl': it.get('cover_xl'),
'cover_big': it.get('cover_big')
})
console.print(f"[cyan][DEBUG] Deezer candidates: {dbg}[/cyan]")
except Exception:
pass
if not items:
# try a conservative normalized fallback
norm_q = f"{normalize_name(artist)} {normalize_name(album)}"
if COVER_DEBUG_QUERIES:
console.print(f"[cyan][DEBUG] Deezer fallback query: {norm_q}")
url2 = f"https://api.deezer.com/search/album?q={quote(norm_q)}&limit=1"
async with session.get(url2, timeout=timeout) as resp2:
if resp2.status != 200:
return None
data2 = await resp2.json()
items = data2.get('data') or []
if not items:
return None
first = items[0]
# prefer XL or big covers
art_url = first.get('cover_xl') or first.get('cover_big') or first.get('cover')
if not art_url:
return None
await limiter.acquire()
img_timeout = aiohttp.ClientTimeout(total=15)
async with session.get(art_url, timeout=img_timeout) as img_resp:
if img_resp.status == 200:
return await img_resp.read()
except Exception:
return None
return None
async def search_lastfm_cover(session: aiohttp.ClientSession, artist, album, limiter: 'AsyncRateLimiter'):
"""Search Last.fm for album cover using album.getInfo. Requires LASTFM_API_KEY in env."""
LASTFM_API_KEY = os.getenv('LASTFM_API_KEY')
if not LASTFM_API_KEY:
console.print(f"[yellow]LastFM: LASTFM_API_KEY not configured; skipping LastFM search for '{artist} - {album}'[/yellow]")
return None
try:
params = {
'method': 'album.getinfo',
'api_key': LASTFM_API_KEY,
'artist': artist,
'album': album,
'format': 'json',
}
from urllib.parse import quote
qs = '&'.join(f"{quote(str(k))}={quote(str(v))}" for k, v in params.items())
url = f"http://ws.audioscrobbler.com/2.0/?{qs}"
await limiter.acquire()
timeout = aiohttp.ClientTimeout(total=10)
async with session.get(url, timeout=timeout) as resp:
if resp.status != 200:
return None
data = await resp.json()
album_data = data.get('album') or {}
images = album_data.get('image') or []
# images is a list of dicts with '#text' and 'size'
art_url = None
# prefer 'extralarge' or 'mega'
for size_name in ('mega', 'extralarge', 'large', 'medium'):
for img in images:
if img.get('size') == size_name and img.get('#text'):
art_url = img.get('#text')
break
if art_url:
break
if not art_url:
return None
await limiter.acquire()
img_timeout = aiohttp.ClientTimeout(total=15)
async with session.get(art_url, timeout=img_timeout) as img_resp:
if img_resp.status == 200:
return await img_resp.read()
except Exception:
return None
return None
_SPOTIFY_CLIENT = None
def get_spotify_client():
"""Lazily create and cache a spotipy.Spotify client using client-credentials.
Returns None if spotipy is not installed or credentials are not configured.
"""
global _SPOTIFY_CLIENT
if _SPOTIFY_CLIENT is not None:
return _SPOTIFY_CLIENT
if not HAVE_SPOTIPY:
return None
client_id = os.getenv('SPOTIFY_CLIENT_ID')
client_secret = os.getenv('SPOTIFY_CLIENT_SECRET')
if not client_id or not client_secret:
return None
try:
import importlib
sp_mod = importlib.import_module('spotipy')
creds_mod = importlib.import_module('spotipy.oauth2')
SpotifyClientCredentials = getattr(creds_mod, 'SpotifyClientCredentials', None)
SpotifyCls = getattr(sp_mod, 'Spotify', None)
if SpotifyClientCredentials is None or SpotifyCls is None:
return None
creds = SpotifyClientCredentials(client_id=client_id, client_secret=client_secret)
_SPOTIFY_CLIENT = SpotifyCls(client_credentials_manager=creds)
return _SPOTIFY_CLIENT
except Exception:
return None
async def search_spotify_cover(session: aiohttp.ClientSession, artist, album, limiter: 'AsyncRateLimiter', isrc: Optional[str] = None):
"""Search Spotify for album cover with multiple strategies:
- If `isrc` provided, try track search by ISRC first.
- Try quoted album+artist queries, then looser queries.
- Use fuzzy matching to validate results.
- Pick the largest available image.
"""
client = get_spotify_client()
if client is None:
console.print(f"[yellow]Spotify: client not configured or spotipy not installed; skipping search for '{artist} - {album}'[/yellow]")
return None
def _sp_search(q, typ='album', limit=3):
try:
return client.search(q=q, type=typ, limit=limit)
except Exception:
return None
try:
# 1) ISRC search (track -> album)
if isrc:
res = await asyncio.to_thread(_sp_search, f'isrc:{isrc}', 'track', 1)
if res:
tracks = res.get('tracks', {}).get('items', [])
if tracks:
album_obj = tracks[0].get('album') or {}
images = album_obj.get('images') or []
if images:
# pick largest
best = max(images, key=lambda x: x.get('width') or 0)
art_url = best.get('url')
if art_url:
await limiter.acquire()
async with session.get(art_url, timeout=aiohttp.ClientTimeout(total=15)) as img_resp:
if img_resp.status == 200:
return await img_resp.read()
# Prepare normalized variants for querying
quoted_q = f'album:"{album}" artist:"{artist}"'
exact_q = f'artist:{artist} album:{album}'
norm_artist = normalize_name(artist)
norm_album = normalize_name(album)
simple_q = f'album:{norm_album} artist:{norm_artist}'
queries = [quoted_q, exact_q, simple_q, f'album:"{album}"', f'artist:"{artist}"']
for q in queries:
res = await asyncio.to_thread(_sp_search, q, 'album', 3)
if not res:
continue
albums = res.get('albums', {}).get('items', [])
if COVER_DEBUG_QUERIES:
try:
dbg = []
for a in (albums or [])[:3]:
dbg.append({
'id': a.get('id'),
'name': a.get('name'),
'artists': [ar.get('name') for ar in (a.get('artists') or [])[:3] if ar.get('name')],
'images': [img.get('url') for img in (a.get('images') or [])[:3]]
})
console.print(f"[cyan][DEBUG] Spotify album candidates for query '{q}': {dbg}[/cyan]")
except Exception:
pass
if not albums:
continue
# examine candidates and pick the best match via fuzzy matching
for a in albums:
found_album = a.get('name') or ''
found_artist = ' '.join([ar.get('name') for ar in (a.get('artists') or []) if ar.get('name')])
if is_fuzzy_match(artist, found_artist, threshold=75) and (not album or is_fuzzy_match(album, found_album, threshold=70)):
images = a.get('images') or []
if not images:
continue
best = max(images, key=lambda x: x.get('width') or 0)
art_url = best.get('url')
if art_url:
await limiter.acquire()
try:
async with session.get(art_url, timeout=aiohttp.ClientTimeout(total=15)) as img_resp:
if img_resp.status == 200:
return await img_resp.read()
except Exception:
continue
return None
except Exception:
return None
# Fuzzy match helper for metadata
def is_fuzzy_match(expected, actual, threshold=80):
if not expected or not actual:
return False
return fuzz.token_set_ratio(expected.lower(), actual.lower()) >= threshold
# Fuzzy match for all fields
def is_metadata_match(expected_artist, expected_album, expected_title, found_artist, found_album, found_title, threshold=80):
artist_match = is_fuzzy_match(expected_artist, found_artist, threshold)
album_match = is_fuzzy_match(expected_album, found_album, threshold) if expected_album else True
title_match = is_fuzzy_match(expected_title, found_title, threshold)
return artist_match and album_match and title_match
# Utility to normalize artist/song names for searching
def normalize_name(name):
# Lowercase, strip, remove extra spaces, and remove common punctuation
name = name.lower().strip()
name = re.sub(r"\([0-9]\)$", "", name) # remove (1), (2), etc. at end
name = re.sub(r"[\s_]+", " ", name)
name = re.sub(r"[\(\)\[\]\{\}\'\"\!\?\.,:;`~@#$%^&*+=|\\/<>]", "", name)
return name
def artist_credit_to_name(ac):
"""Safely convert a MusicBrainz artist-credit array into a single artist name string."""
parts = []
for a in ac:
if isinstance(a, dict):
# Common formats: {'name': 'Artist Name'} or {'artist': {'name': 'Artist Name'}}
name = None
if a.get('name'):
name = a.get('name')
elif a.get('artist') and isinstance(a.get('artist'), dict) and a.get('artist', {}).get('name'):
name = a.get('artist', {}).get('name')
if name:
parts.append(name)
return " ".join(parts)
# Suppress noisy loggers (aiohttp, urllib3, etc.)
for noisy_logger in [
"aiohttp.client",
"aiohttp.server",
"aiohttp.access",
"urllib3",
"asyncio",
"chardet",
"requests.packages.urllib3",
]:
logging.getLogger(noisy_logger).setLevel(logging.CRITICAL)
logging.getLogger(noisy_logger).propagate = False
# Also suppress root logger to CRITICAL for anything not our own
logging.getLogger().setLevel(logging.CRITICAL)
async def fetch_srutil_cover(sr, artist, song, session: aiohttp.ClientSession, limiter: AsyncRateLimiter):
try:
album = await sr.get_album_by_name(artist, song)
if not album or not album.get('id'):
return None
cover_url = await sr.get_cover_by_album_id(album['id'], 640)
if cover_url:
await limiter.acquire()
try:
timeout = aiohttp.ClientTimeout(total=15)
async with session.get(cover_url, timeout=timeout) as resp:
if resp.status == 200:
return await resp.read()
else:
console.print(f"[red]SRUtil: Failed to fetch cover art from URL (status {resp.status}): {cover_url}[/red]")
except Exception as e:
console.print(f"[red]SRUtil: Exception fetching cover url: {e}[/red]")
except Exception as e:
msg = str(e)
if "Cannot combine AUTHORIZATION header with AUTH argument" in msg:
console.print("[red]SRUtil: Skipping due to conflicting authentication method in dependency (AUTHORIZATION header + AUTH argument).[/red]")
else:
console.print(f"[red]SRUtil: Exception: {e}[/red]")
return None
async def get_isrc(file):
try:
def _read_isrc():
f = load_file(file)
# music_tag may store ISRC under 'isrc' or 'ISRC'
try:
val = f['isrc'].value
except Exception:
try:
val = f['ISRC'].value
except Exception:
val = None
if isinstance(val, list):
return val[0] if val else None
return val
return await asyncio.to_thread(_read_isrc)
except Exception as e:
console.print(f"[red]Error reading ISRC for {file}: {e}[/red]")
return None
async def search_musicbrainz_by_isrc(session, isrc, limiter: AsyncRateLimiter):
if not isrc:
return None
headers = {"User-Agent": "cover-art-script/1.0"}
# Use the ISRC lookup endpoint which returns recordings
url = f"https://musicbrainz.org/ws/2/isrc/{isrc}?fmt=json"
try:
await limiter.acquire()
timeout = aiohttp.ClientTimeout(total=15)
async with session.get(url, headers=headers, timeout=timeout) as resp:
if resp.status != 200:
return None
try:
data = await resp.json()
except Exception:
return None
recordings = data.get('recordings') or []
for rec in recordings:
# try releases tied to this recording
releases = rec.get('releases') or []
if releases:
relid = releases[0].get('id')
if relid:
caa_url = f"https://coverartarchive.org/release/{relid}/front-500"
async with session.get(caa_url, timeout=timeout) as caa_resp:
if caa_resp.status == 200:
console.print(f"[green]Found cover art via ISRC {isrc}[/green]")
return await caa_resp.read()
return None
except Exception as e:
console.print(f"[red]MusicBrainz ISRC lookup exception for {isrc}: {e}[/red]")
return None
# Concurrency limit for async processing
CONCURRENCY = 18
# Helper for formatting failure reasons in a consistent way
def format_failure_reason(e, resp_status=None):
"""Format a failure reason from an exception or response status"""
if isinstance(e, asyncio.TimeoutError):
return "timeout"
elif isinstance(e, aiohttp.ClientError):
return f"network error: {str(e)}"
elif resp_status:
return f"HTTP {resp_status}"
elif e:
return str(e)
return "no match found"
async def process_file(file, sr, table, results, sem, session: aiohttp.ClientSession, limiters: dict):
"""Process a single audio file to find and embed cover art."""
async with sem:
if await has_cover(file):
table.add_row(file, "Already Present", "-")
results.append([file, "Already Present", "-"])
return
artist, album, title = await get_artist_album_title(file)
album_key = (artist, album)
image_bytes = ALBUM_ART_CACHE.get(album_key)
source = None
status = "Failed"
# Try ISRC-based lookup first
isrc = await get_isrc(file)
if isrc:
img = await search_musicbrainz_by_isrc(session, isrc, limiters['musicbrainz'])
if img:
image_bytes = img
source = f"MusicBrainz (ISRC:{isrc})"
status = "Success"
ALBUM_ART_CACHE[album_key] = image_bytes
_log_attempt(artist, album, title, source, "Success")
else:
_log_attempt(artist, album, title, f"MusicBrainz (ISRC:{isrc})", "No match")
# If ONLY_SPOTIFY testing mode is enabled, attempt only Spotify and return
if ONLY_SPOTIFY:
img = await search_spotify_cover(session, artist, album, limiters['spotify'], isrc)
if img:
image_bytes = img
source = "Spotify"
status = "Success"
ALBUM_ART_CACHE[album_key] = image_bytes
_log_attempt(artist, album, title, "Spotify", "Success")
file_basename = os.path.basename(file)
ok = await embed_cover(file, image_bytes)
if ok:
console.print(f"[green][FINAL RESULT] {file_basename}{artist} / {album} | Success via {source}[/green]")
else:
status = "Embed Failed"
console.print(f"[red][FINAL RESULT] {file_basename}{artist} / {album} | Embed Failed from {source}[/red]")
else:
_log_attempt(artist, album, title, "Spotify", "No match")
console.print(f"[yellow][FINAL RESULT] {os.path.basename(file)}{artist} / {album} | No Spotify cover art found[/yellow]")
table.add_row(file, status, source if source else "-")
results.append([file, status, source if source else "-"])
return
# SRUtil
if not image_bytes:
img = await fetch_srutil_cover(sr, artist, album, session, limiters['srutil'])
if img:
image_bytes = img
source = "SRUtil"
status = "Success"
ALBUM_ART_CACHE[album_key] = image_bytes
_log_attempt(artist, album, title, "SRUtil", "Success")
else:
_log_attempt(artist, album, title, "SRUtil", "No match")
# MusicBrainz
if not image_bytes:
img = await search_musicbrainz_cover(artist, album, session, limiters['musicbrainz'])
if img:
image_bytes = img
source = "MusicBrainz"
status = "Success"
ALBUM_ART_CACHE[album_key] = image_bytes
_log_attempt(artist, album, title, "MusicBrainz", "Success")
else:
_log_attempt(artist, album, title, "MusicBrainz", "No match")
# Discogs
if not image_bytes:
img = await search_discogs_cover(artist, album, session, limiters['discogs'])
if img:
image_bytes = img
source = "Discogs"
status = "Success"
ALBUM_ART_CACHE[album_key] = image_bytes
_log_attempt(artist, album, title, "Discogs", "Success")
else:
_log_attempt(artist, album, title, "Discogs", "No match")
# Deezer
if not image_bytes:
img = await search_deezer_cover(session, artist, album, limiters['deezer'])
if img:
image_bytes = img
source = "Deezer"
status = "Success"
ALBUM_ART_CACHE[album_key] = image_bytes
_log_attempt(artist, album, title, "Deezer", "Success")
else:
_log_attempt(artist, album, title, "Deezer", "No match")
# Spotify
if not image_bytes:
img = await search_spotify_cover(session, artist, album, limiters['spotify'], isrc)
if img:
image_bytes = img
source = "Spotify"
status = "Success"
ALBUM_ART_CACHE[album_key] = image_bytes
_log_attempt(artist, album, title, "Spotify", "Success")
else:
_log_attempt(artist, album, title, "Spotify", "No match")
# iTunes album
if not image_bytes:
img = await search_itunes_cover(session, artist, album, limiters['itunes'])
if img:
image_bytes = img
source = "iTunes(album)"
status = "Success"
ALBUM_ART_CACHE[album_key] = image_bytes
_log_attempt(artist, album, title, "iTunes(album)", "Success")
else:
_log_attempt(artist, album, title, "iTunes(album)", "No match")
# iTunes track
if not image_bytes:
img = await search_itunes_track(session, artist, title, limiters['itunes'])
if img:
image_bytes = img
source = "iTunes(track)"
status = "Success"
ALBUM_ART_CACHE[album_key] = image_bytes
_log_attempt(artist, album, title, "iTunes(track)", "Success")
else:
_log_attempt(artist, album, title, "iTunes(track)", "No match")
# Last.fm
if not image_bytes:
img = await search_lastfm_cover(session, artist, album, limiters['lastfm'])
if img:
image_bytes = img
source = "LastFM"
status = "Success"
ALBUM_ART_CACHE[album_key] = image_bytes
_log_attempt(artist, album, title, "LastFM", "Success")
else:
_log_attempt(artist, album, title, "LastFM", "No match")
# Embed and summary
file_basename = os.path.basename(file)
if image_bytes and source:
ok = await embed_cover(file, image_bytes)
if ok:
console.print(f"[green][FINAL RESULT] {file_basename}{artist} / {album} | Success via {source}[/green]")
else:
status = "Embed Failed"
console.print(f"[red][FINAL RESULT] {file_basename}{artist} / {album} | Embed Failed from {source}[/red]")
else:
console.print(f"[yellow][FINAL RESULT] {file_basename}{artist} / {album} | No cover art found[/yellow]")
table.add_row(file, status, source if source else "-")
results.append([file, status, source if source else "-"])
async def has_cover(file):
# Check if the audio file already has embedded cover art
try:
f = load_file(file)
# music_tag stores artwork in 'artwork' which may be a list-like field
art = f['artwork']
# If there is any artwork, consider it present
try:
return bool(art.first)
except Exception:
# fallback if .first not available
return bool(art)
except Exception:
return False
async def get_artist_album_title(file):
# Extract artist, album, and title from audio file tags
try:
f = load_file(file)
artist = str(f['artist'].first) if f['artist'].first else "Unknown Artist"
album = str(f['album'].first) if f['album'].first else "Unknown Album"
title = str(f['title'].first) if f['title'].first else "Unknown Title"
return artist, album, title
except Exception:
return "Unknown Artist", "Unknown Album", "Unknown Title"
async def embed_cover(file, image_bytes):
# Embed cover art into audio file metadata using music_tag
try:
f = load_file(file)
f['artwork'] = image_bytes
f.save()
return True
except Exception as e:
console.print(f"[red][ERROR] Failed to embed cover: {e}[/red]")
return False
async def main():
try:
console.print(f"[bold blue]Scanning directory: {MUSIC_DIR}[/bold blue]")
sr = SRUtil()
results = []
files = []
for root, _, filenames in os.walk(MUSIC_DIR):
for fn in filenames:
if os.path.splitext(fn)[1].lower() in AUDIO_EXTS:
file_path = os.path.join(root, fn)
files.append(file_path)
table = Table(title="Cover Art Embedding Report")
table.add_column("File", style="cyan", overflow="fold")
table.add_column("Status", style="green")
table.add_column("Source", style="magenta")
# create rate limiters (seconds between requests)
RATE_SRUTIL = 0.1
RATE_MUSICBRAINZ = 1.0
RATE_ITUNES = 0.5
RATE_DISCOGS = 1.0
RATE_DEEZER = 0.5
RATE_LASTFM = 1.0
RATE_SPOTIFY = 0.5
limiters = {
'srutil': AsyncRateLimiter(RATE_SRUTIL),
'musicbrainz': AsyncRateLimiter(RATE_MUSICBRAINZ),
'itunes': AsyncRateLimiter(RATE_ITUNES),
'discogs': AsyncRateLimiter(RATE_DISCOGS),
'deezer': AsyncRateLimiter(RATE_DEEZER),
'lastfm': AsyncRateLimiter(RATE_LASTFM),
'spotify': AsyncRateLimiter(RATE_SPOTIFY),
}
sem = asyncio.Semaphore(CONCURRENCY)
def format_failure_reason(e, resp_status=None):
"""Format a failure reason from an exception or response status"""
if isinstance(e, asyncio.TimeoutError):
return "timeout"
elif isinstance(e, aiohttp.ClientError):
return f"network error: {str(e)}"
elif resp_status:
return f"HTTP {resp_status}"
elif e:
return str(e)
return "no match found"
async def worker(file, sr, table, results, sem, progress, task_id, session, limiters):
await process_file(file, sr, table, results, sem, session, limiters)
progress.update(task_id, advance=1)
async with aiohttp.ClientSession() as session:
with Progress(
TextColumn("[progress.description]{task.description}"),
BarColumn(),
TaskProgressColumn(),
TimeElapsedColumn(),
) as progress:
task_id = progress.add_task("Processing files...", total=len(files))
# Schedule all workers
await asyncio.gather(*(worker(file, sr, table, results, sem, progress, task_id, session, limiters) for file in files))
# Print summary table and CSV after progress bar
console.print(table)
with open(REPORT_CSV, "w", newline="") as f:
writer = csv.writer(f)
writer.writerow(["File", "Status", "Source"])
writer.writerows(results)
console.print(f"[bold green]CSV report written to {REPORT_CSV}[/bold green]")
except Exception as e:
console.print(f"[red][ERROR] Unhandled exception: {e}[/red]")
traceback.print_exc()
sys.exit(1)
if __name__ == "__main__":
asyncio.run(main())