Files
api/test/add_cover_art.py
2025-09-23 13:17:34 -04:00

1300 lines
48 KiB
Python

# Standard library
import os
import sys
import re
import csv
import asyncio
import logging
import traceback
# Third-party
import aiohttp
from pathlib import Path
from dotenv import load_dotenv
from rapidfuzz import fuzz
from music_tag import load_file # type: ignore
from rich.console import Console
from rich.table import Table
from rich.progress import (
Progress,
BarColumn,
TextColumn,
TimeElapsedColumn,
TaskProgressColumn,
)
# Local imports
sys.path.insert(0, "..")
from utils.sr_wrapper import SRUtil
import musicbrainzngs # type: ignore
from discogs_client import Client # type: ignore
# typing helper
from typing import Any, cast, Optional
# Optional: use the popular `itunespy` PyPI package when available
try:
import itunespy # type: ignore
HAVE_ITUNESPY = True
except Exception:
itunespy = None
HAVE_ITUNESPY = False
# Optional: use `spotipy` when available for Spotify lookups
try:
import spotipy # type: ignore
HAVE_SPOTIPY = True
except Exception:
spotipy = None
HAVE_SPOTIPY = False
# Reminder: If you see 'Import "music_tag" could not be resolved', run:
# uv add music-tag
# Configurable paths and extensions
MUSIC_DIR = Path("/storage/music2/completed/FLAC/review")
AUDIO_EXTS = {".flac", ".mp3", ".m4a", ".ogg", ".wav", ".aac"}
REPORT_CSV = "cover_art_report.csv"
ALBUM_ART_CACHE: dict = {}
# Reminder: If you see 'Import "music_tag" could not be resolved', run:
# uv add music-tag
async def search_musicbrainz_cover(
artist, album, session: aiohttp.ClientSession, limiter: "AsyncRateLimiter"
):
# Use musicbrainzngs to search for a release-group matching artist+album
try:
# search for release-groups using a thread to avoid blocking
query = f"artist:{artist} AND release:{album}"
try:
res = await asyncio.to_thread(
musicbrainzngs.search_release_groups, query, 5
)
except Exception:
res = {}
if COVER_DEBUG_QUERIES:
try:
rgs_dbg = res.get("release-group-list") or []
dbg_info = []
for rg in rgs_dbg[:3]:
dbg_info.append(
{
"id": rg.get("id"),
"title": rg.get("title"),
"artist": artist_credit_to_name(
rg.get("artist-credit", [])
),
}
)
console.print(
f"[cyan][DEBUG] MusicBrainz candidates: {dbg_info}[/cyan]"
)
except Exception:
pass
rgs = res.get("release-group-list") or []
if COVER_DEBUG_QUERIES:
try:
dbg_info = []
for rg in (rgs or [])[:3]:
dbg_info.append(
{
"id": rg.get("id"),
"title": rg.get("title"),
"artist": artist_credit_to_name(
rg.get("artist-credit", [])
),
}
)
console.print(
f"[cyan][DEBUG] MusicBrainz top candidates: {dbg_info}[/cyan]"
)
except Exception:
pass
for rg in rgs:
# try to get cover art via Cover Art Archive for releases in the group
# check releases for a cover
releases = rg.get("release-list") or []
for rel in releases:
relid = rel.get("id")
if relid:
caa_url = f"https://coverartarchive.org/release/{relid}/front-500"
try:
await limiter.acquire()
timeout = aiohttp.ClientTimeout(total=15)
async with session.get(caa_url, timeout=timeout) as resp:
if resp.status == 200:
return await resp.read()
except Exception:
continue
return None
except Exception as e:
console.print(f"[red]MusicBrainz search exception: {e}[/red]")
return None
async def search_discogs_cover(
artist, album, session: aiohttp.ClientSession, limiter: "AsyncRateLimiter"
):
# Use discogs_client to search for releases matching artist+album
try:
if not DISCOGS_TOKEN:
return None
# Use the discogs client (synchronous) to search in a thread
try:
await limiter.acquire()
if COVER_DEBUG_QUERIES:
console.print(
f"[cyan][DEBUG] Discogs query: album='{album}' artist='{artist}'"
)
results = await asyncio.to_thread(
discogs_client.search, album, {"artist": artist, "type": "release"}
)
except Exception:
results = []
if COVER_DEBUG_QUERIES:
try:
dbg = []
for rr in (results or [])[:3]:
try:
data = getattr(rr, "data", {}) or {}
dbg.append(
{
"id": data.get("id"),
"title": data.get("title")
or getattr(rr, "title", None),
"cover_image": data.get("cover_image"),
}
)
except Exception:
continue
console.print(f"[cyan][DEBUG] Discogs candidates: {dbg}[/cyan]")
except Exception:
pass
if not results:
# conservative normalized fallback: try a combined normalized string
try:
await limiter.acquire()
combined = f"{normalize_name(artist)} {normalize_name(album)}"
if COVER_DEBUG_QUERIES:
console.print(f"[cyan][DEBUG] Discogs fallback query: {combined}")
results = await asyncio.to_thread(
discogs_client.search, combined, {"type": "release"}
)
except Exception:
results = []
for r in results:
# r.data may contain 'cover_image' or images
cover = None
try:
cover = r.data.get("cover_image")
except Exception:
cover = None
if not cover:
# try images list
imgs = r.data.get("images") or []
if imgs and isinstance(imgs, list) and imgs[0].get("uri"):
cover = imgs[0].get("uri")
if cover:
# fetch image via aiohttp
try:
await limiter.acquire()
timeout = aiohttp.ClientTimeout(total=15)
async with session.get(cover, timeout=timeout) as resp:
if resp.status == 200:
return await resp.read()
except Exception:
continue
return None
except Exception as e:
console.print(f"[red]Discogs search exception: {e}[/red]")
return None
# Load env once
load_dotenv()
# Console for pretty output
console = Console()
# If set to '1'|'true', run only Spotify searches (useful for quick testing)
ONLY_SPOTIFY = os.getenv("ONLY_SPOTIFY", "").lower() in ("1", "true", "yes")
# If set, print query strings and brief response info for debugging
COVER_DEBUG_QUERIES = os.getenv("COVER_DEBUG_QUERIES", "").lower() in (
"1",
"true",
"yes",
)
# If set, use more aggressive fuzzy thresholds and extra fallbacks
COVER_AGGRESSIVE = os.getenv("COVER_AGGRESSIVE", "").lower() in ("1", "true", "yes")
def _log_attempt(artist, album, title, source, result):
"""Log a single, clean attempt line to console and to `search_attempts.log`.
result should be a short string like 'Success', 'No match', 'Timeout', or an error message.
"""
try:
a = artist or "Unknown Artist"
al = album or "Unknown Album"
t = title or "Unknown Title"
line = f"SEARCH: {a} - {al} / {t} | Source: {source} | Result: {result}"
console.print(line)
try:
with open("search_attempts.log", "a", encoding="utf-8") as lf:
lf.write(line + "\n")
except Exception:
pass
except Exception:
# Never crash logging
pass
# Define a lightweight async rate limiter
class AsyncRateLimiter:
def __init__(self, rate_seconds: float):
self._rate = float(rate_seconds)
self._lock = asyncio.Lock()
self._last = 0.0
async def acquire(self) -> None:
async with self._lock:
now = asyncio.get_event_loop().time()
wait = self._rate - (now - self._last)
if wait > 0:
await asyncio.sleep(wait)
self._last = asyncio.get_event_loop().time()
# Initialize MusicBrainz client
musicbrainzngs.set_useragent("cover-art-script", "1.0", "your-email@example.com")
# Initialize Discogs client
DISCOGS_TOKEN = os.getenv("DISCOGS_TOKEN")
discogs_client = Client("cover-art-script/1.0", user_token=DISCOGS_TOKEN)
# Define the log_api_response function at the top of the script
async def log_api_response(api_name, response):
"""Log relevant parts of API responses for debugging purposes."""
try:
data = await response.json()
if api_name == "MusicBrainz":
release_groups = data.get("release-groups", [])
relevant_info = [
{
"id": rg.get("id"),
"title": rg.get("title"),
"artist": artist_credit_to_name(rg.get("artist-credit", [])),
}
for rg in release_groups
]
console.print(
f"[cyan][DEBUG] {api_name} relevant response: {relevant_info}[/cyan]"
)
elif api_name == "Discogs":
results = data.get("results", [])
relevant_info = [
{
"id": result.get("id"),
"title": result.get("title"),
"cover_image": result.get("cover_image"),
}
for result in results
]
console.print(
f"[cyan][DEBUG] {api_name} relevant response: {relevant_info}[/cyan]"
)
elif api_name == "iTunes":
results = data.get("results", [])
relevant_info = [
{
"collectionId": result.get("collectionId"),
"collectionName": result.get("collectionName"),
"artworkUrl100": result.get("artworkUrl100"),
}
for result in results
]
console.print(
f"[cyan][DEBUG] {api_name} relevant response: {relevant_info}[/cyan]"
)
else:
console.print(f"[cyan][DEBUG] {api_name} response: {data}[/cyan]")
except Exception as e:
console.print(f"[red][DEBUG] Failed to parse {api_name} response: {e}[/red]")
# Helper to strip common parenthetical tags from album names
def strip_album_tags(album):
"""Remove common parenthetical tags from the end of album names."""
pattern = r"\s*\((deluxe|remaster(ed)?|original mix|expanded|bonus|edition|version|mono|stereo|explicit|clean|anniversary|special|reissue|expanded edition|bonus track(s)?|international|digital|single|ep|live|instrumental|karaoke|radio edit|explicit version|clean version|acoustic|demo|re-recorded|remix|mix|edit|feat\.?|featuring|with .+|from .+|soundtrack|ost|score|session|vol(ume)? ?\d+|disc ?\d+|cd ?\d+|lp ?\d+|vinyl|202[0-9]|20[0-1][0-9]|19[0-9][0-9])\)$"
return re.sub(pattern, "", album, flags=re.IGNORECASE).strip()
# Helper to strip common trailing tags like EP, LP, Single, Album, etc. from album names
def strip_album_suffix(album):
# Remove trailing tags like ' EP', ' LP', ' Single', ' Album', ' Remix', ' Version', etc.
# Only if they appear at the end, case-insensitive, with or without punctuation
suffix_pattern = r"[\s\-_:]*(ep|lp|single|album|remix|version|edit|mix|deluxe|expanded|anniversary|reissue|instrumental|karaoke|ost|score|session|mono|stereo|explicit|clean|bonus|disc ?\d+|cd ?\d+|vinyl|202[0-9]|20[0-1][0-9]|19[0-9][0-9])$"
return re.sub(suffix_pattern, "", album, flags=re.IGNORECASE).strip()
# iTunes/Apple Music API fallback (async)
async def search_itunes_cover(
session: aiohttp.ClientSession, artist, album, limiter: "AsyncRateLimiter"
):
# Use only the `itunespy` library for iTunes album lookups.
if not HAVE_ITUNESPY:
console.print(
f"[yellow]iTunes: itunespy not available; skipping iTunes album search for '{artist} - {album}'[/yellow]"
)
return None
try:
mod = cast(Any, itunespy)
def _search():
try:
# try common itunespy APIs safely
if hasattr(mod, "search_album"):
return mod.search_album(f"{artist} {album}")
if hasattr(mod, "Album") and hasattr(mod.Album, "search"):
return mod.Album.search(f"{artist} {album}")
if hasattr(mod, "search"):
return mod.search(f"{artist} {album}", entity="album")
return None
except Exception:
return None
albums = await asyncio.to_thread(_search)
if COVER_DEBUG_QUERIES and albums:
try:
dbg = []
for a in (albums or [])[:3]:
try:
aid = getattr(a, "collectionId", None) or (
a.get("collectionId") if isinstance(a, dict) else None
)
except Exception:
aid = None
try:
aname = getattr(a, "collectionName", None) or (
a.get("collectionName") if isinstance(a, dict) else None
)
except Exception:
aname = None
dbg.append({"id": aid, "name": aname})
console.print(f"[cyan][DEBUG] iTunes album candidates: {dbg}[/cyan]")
except Exception:
pass
if not albums:
if COVER_DEBUG_QUERIES:
console.print(
f"[cyan][DEBUG] iTunes album: no results for '{artist} - {album}', trying normalized fallback"
)
norm_q = f"{normalize_name(artist)} {normalize_name(album)}"
def _search_norm():
try:
if hasattr(mod, "search_album"):
return mod.search_album(norm_q)
if hasattr(mod, "Album") and hasattr(mod.Album, "search"):
return mod.Album.search(norm_q)
if hasattr(mod, "search"):
return mod.search(norm_q, entity="album")
return None
except Exception:
return None
albums = await asyncio.to_thread(_search_norm)
if not albums:
return None
first = albums[0]
art_url = getattr(first, "artwork_url", None) or getattr(
first, "artworkUrl100", None
)
if not art_url:
return None
# Normalize to higher-res if possible
if "100x100" in art_url:
art_url = art_url.replace("100x100bb", "600x600bb")
await limiter.acquire()
img_timeout = aiohttp.ClientTimeout(total=15)
try:
async with session.get(art_url, timeout=img_timeout) as img_resp:
if img_resp.status == 200:
return await img_resp.read()
except Exception:
return None
except Exception as e:
console.print(f"[red][ERROR] itunespy album search exception: {e}[/red]")
return None
async def search_itunes_track(
session: aiohttp.ClientSession, artist, title, limiter: "AsyncRateLimiter"
):
# Use only the `itunespy` library for iTunes track lookups.
if not HAVE_ITUNESPY:
console.print(
f"[yellow]iTunes: itunespy not available; skipping iTunes track search for '{artist} - {title}'[/yellow]"
)
return None
try:
mod = cast(Any, itunespy)
def _search():
try:
if hasattr(mod, "search_track"):
return mod.search_track(f"{artist} {title}")
if hasattr(mod, "Track") and hasattr(mod.Track, "search"):
return mod.Track.search(f"{artist} {title}")
if hasattr(mod, "search"):
return mod.search(f"{artist} {title}", entity="song")
return None
except Exception:
return None
tracks = await asyncio.to_thread(_search)
if not tracks:
if COVER_DEBUG_QUERIES:
console.print(
f"[cyan][DEBUG] iTunes track: no results for '{artist} - {title}', trying normalized fallback"
)
norm_q = f"{normalize_name(artist)} {normalize_name(title)}"
def _search_norm_track():
try:
if hasattr(mod, "search_track"):
return mod.search_track(norm_q)
if hasattr(mod, "Track") and hasattr(mod.Track, "search"):
return mod.Track.search(norm_q)
if hasattr(mod, "search"):
return mod.search(norm_q, entity="song")
return None
except Exception:
return None
tracks = await asyncio.to_thread(_search_norm_track)
if not tracks:
return None
first = tracks[0]
art_url = getattr(first, "artwork_url", None) or getattr(
first, "artworkUrl100", None
)
if not art_url:
return None
if "100x100" in art_url:
art_url = art_url.replace("100x100bb", "600x600bb")
await limiter.acquire()
img_timeout = aiohttp.ClientTimeout(total=15)
try:
async with session.get(art_url, timeout=img_timeout) as img_resp:
if img_resp.status == 200:
return await img_resp.read()
except Exception:
return None
except Exception as e:
console.print(f"[red][ERROR] itunespy track search exception: {e}[/red]")
return None
async def search_deezer_cover(
session: aiohttp.ClientSession, artist, album, limiter: "AsyncRateLimiter"
):
"""Search Deezer for an album cover. Uses Deezer public API (no auth)."""
try:
# build simple query
from urllib.parse import quote
query = f"{artist} {album}"
if COVER_DEBUG_QUERIES:
console.print(f"[cyan][DEBUG] Deezer query: {query}")
url = f"https://api.deezer.com/search/album?q={quote(query)}&limit=1"
await limiter.acquire()
timeout = aiohttp.ClientTimeout(total=10)
async with session.get(url, timeout=timeout) as resp:
if resp.status != 200:
return None
data = await resp.json()
items = data.get("data") or []
if COVER_DEBUG_QUERIES:
try:
dbg = []
for it in (items or [])[:3]:
dbg.append(
{
"id": it.get("id"),
"title": it.get("title"),
"cover_xl": it.get("cover_xl"),
"cover_big": it.get("cover_big"),
}
)
console.print(f"[cyan][DEBUG] Deezer candidates: {dbg}[/cyan]")
except Exception:
pass
if not items:
# try a conservative normalized fallback
norm_q = f"{normalize_name(artist)} {normalize_name(album)}"
if COVER_DEBUG_QUERIES:
console.print(f"[cyan][DEBUG] Deezer fallback query: {norm_q}")
url2 = f"https://api.deezer.com/search/album?q={quote(norm_q)}&limit=1"
async with session.get(url2, timeout=timeout) as resp2:
if resp2.status != 200:
return None
data2 = await resp2.json()
items = data2.get("data") or []
if not items:
return None
first = items[0]
# prefer XL or big covers
art_url = (
first.get("cover_xl") or first.get("cover_big") or first.get("cover")
)
if not art_url:
return None
await limiter.acquire()
img_timeout = aiohttp.ClientTimeout(total=15)
async with session.get(art_url, timeout=img_timeout) as img_resp:
if img_resp.status == 200:
return await img_resp.read()
except Exception:
return None
return None
async def search_lastfm_cover(
session: aiohttp.ClientSession, artist, album, limiter: "AsyncRateLimiter"
):
"""Search Last.fm for album cover using album.getInfo. Requires LASTFM_API_KEY in env."""
LASTFM_API_KEY = os.getenv("LASTFM_API_KEY")
if not LASTFM_API_KEY:
console.print(
f"[yellow]LastFM: LASTFM_API_KEY not configured; skipping LastFM search for '{artist} - {album}'[/yellow]"
)
return None
try:
params = {
"method": "album.getinfo",
"api_key": LASTFM_API_KEY,
"artist": artist,
"album": album,
"format": "json",
}
from urllib.parse import quote
qs = "&".join(f"{quote(str(k))}={quote(str(v))}" for k, v in params.items())
url = f"http://ws.audioscrobbler.com/2.0/?{qs}"
await limiter.acquire()
timeout = aiohttp.ClientTimeout(total=10)
async with session.get(url, timeout=timeout) as resp:
if resp.status != 200:
return None
data = await resp.json()
album_data = data.get("album") or {}
images = album_data.get("image") or []
# images is a list of dicts with '#text' and 'size'
art_url = None
# prefer 'extralarge' or 'mega'
for size_name in ("mega", "extralarge", "large", "medium"):
for img in images:
if img.get("size") == size_name and img.get("#text"):
art_url = img.get("#text")
break
if art_url:
break
if not art_url:
return None
await limiter.acquire()
img_timeout = aiohttp.ClientTimeout(total=15)
async with session.get(art_url, timeout=img_timeout) as img_resp:
if img_resp.status == 200:
return await img_resp.read()
except Exception:
return None
return None
_SPOTIFY_CLIENT = None
def get_spotify_client():
"""Lazily create and cache a spotipy.Spotify client using client-credentials.
Returns None if spotipy is not installed or credentials are not configured.
"""
global _SPOTIFY_CLIENT
if _SPOTIFY_CLIENT is not None:
return _SPOTIFY_CLIENT
if not HAVE_SPOTIPY:
return None
client_id = os.getenv("SPOTIFY_CLIENT_ID")
client_secret = os.getenv("SPOTIFY_CLIENT_SECRET")
if not client_id or not client_secret:
return None
try:
import importlib
sp_mod = importlib.import_module("spotipy")
creds_mod = importlib.import_module("spotipy.oauth2")
SpotifyClientCredentials = getattr(creds_mod, "SpotifyClientCredentials", None)
SpotifyCls = getattr(sp_mod, "Spotify", None)
if SpotifyClientCredentials is None or SpotifyCls is None:
return None
creds = SpotifyClientCredentials(
client_id=client_id, client_secret=client_secret
)
_SPOTIFY_CLIENT = SpotifyCls(client_credentials_manager=creds)
return _SPOTIFY_CLIENT
except Exception:
return None
async def search_spotify_cover(
session: aiohttp.ClientSession,
artist,
album,
limiter: "AsyncRateLimiter",
isrc: Optional[str] = None,
):
"""Search Spotify for album cover with multiple strategies:
- If `isrc` provided, try track search by ISRC first.
- Try quoted album+artist queries, then looser queries.
- Use fuzzy matching to validate results.
- Pick the largest available image.
"""
client = get_spotify_client()
if client is None:
console.print(
f"[yellow]Spotify: client not configured or spotipy not installed; skipping search for '{artist} - {album}'[/yellow]"
)
return None
def _sp_search(q, typ="album", limit=3):
try:
return client.search(q=q, type=typ, limit=limit)
except Exception:
return None
try:
# 1) ISRC search (track -> album)
if isrc:
res = await asyncio.to_thread(_sp_search, f"isrc:{isrc}", "track", 1)
if res:
tracks = res.get("tracks", {}).get("items", [])
if tracks:
album_obj = tracks[0].get("album") or {}
images = album_obj.get("images") or []
if images:
# pick largest
best = max(images, key=lambda x: x.get("width") or 0)
art_url = best.get("url")
if art_url:
await limiter.acquire()
async with session.get(
art_url, timeout=aiohttp.ClientTimeout(total=15)
) as img_resp:
if img_resp.status == 200:
return await img_resp.read()
# Prepare normalized variants for querying
quoted_q = f'album:"{album}" artist:"{artist}"'
exact_q = f"artist:{artist} album:{album}"
norm_artist = normalize_name(artist)
norm_album = normalize_name(album)
simple_q = f"album:{norm_album} artist:{norm_artist}"
queries = [
quoted_q,
exact_q,
simple_q,
f'album:"{album}"',
f'artist:"{artist}"',
]
for q in queries:
res = await asyncio.to_thread(_sp_search, q, "album", 3)
if not res:
continue
albums = res.get("albums", {}).get("items", [])
if COVER_DEBUG_QUERIES:
try:
dbg = []
for a in (albums or [])[:3]:
dbg.append(
{
"id": a.get("id"),
"name": a.get("name"),
"artists": [
ar.get("name")
for ar in (a.get("artists") or [])[:3]
if ar.get("name")
],
"images": [
img.get("url")
for img in (a.get("images") or [])[:3]
],
}
)
console.print(
f"[cyan][DEBUG] Spotify album candidates for query '{q}': {dbg}[/cyan]"
)
except Exception:
pass
if not albums:
continue
# examine candidates and pick the best match via fuzzy matching
for a in albums:
found_album = a.get("name") or ""
found_artist = " ".join(
[
ar.get("name")
for ar in (a.get("artists") or [])
if ar.get("name")
]
)
if is_fuzzy_match(artist, found_artist, threshold=75) and (
not album or is_fuzzy_match(album, found_album, threshold=70)
):
images = a.get("images") or []
if not images:
continue
best = max(images, key=lambda x: x.get("width") or 0)
art_url = best.get("url")
if art_url:
await limiter.acquire()
try:
async with session.get(
art_url, timeout=aiohttp.ClientTimeout(total=15)
) as img_resp:
if img_resp.status == 200:
return await img_resp.read()
except Exception:
continue
return None
except Exception:
return None
# Fuzzy match helper for metadata
def is_fuzzy_match(expected, actual, threshold=80):
if not expected or not actual:
return False
return fuzz.token_set_ratio(expected.lower(), actual.lower()) >= threshold
# Fuzzy match for all fields
def is_metadata_match(
expected_artist,
expected_album,
expected_title,
found_artist,
found_album,
found_title,
threshold=80,
):
artist_match = is_fuzzy_match(expected_artist, found_artist, threshold)
album_match = (
is_fuzzy_match(expected_album, found_album, threshold)
if expected_album
else True
)
title_match = is_fuzzy_match(expected_title, found_title, threshold)
return artist_match and album_match and title_match
# Utility to normalize artist/song names for searching
def normalize_name(name):
# Lowercase, strip, remove extra spaces, and remove common punctuation
name = name.lower().strip()
name = re.sub(r"\([0-9]\)$", "", name) # remove (1), (2), etc. at end
name = re.sub(r"[\s_]+", " ", name)
name = re.sub(r"[\(\)\[\]\{\}\'\"\!\?\.,:;`~@#$%^&*+=|\\/<>]", "", name)
return name
def artist_credit_to_name(ac):
"""Safely convert a MusicBrainz artist-credit array into a single artist name string."""
parts = []
for a in ac:
if isinstance(a, dict):
# Common formats: {'name': 'Artist Name'} or {'artist': {'name': 'Artist Name'}}
name = None
if a.get("name"):
name = a.get("name")
elif (
a.get("artist")
and isinstance(a.get("artist"), dict)
and a.get("artist", {}).get("name")
):
name = a.get("artist", {}).get("name")
if name:
parts.append(name)
return " ".join(parts)
# Suppress noisy loggers (aiohttp, urllib3, etc.)
for noisy_logger in [
"aiohttp.client",
"aiohttp.server",
"aiohttp.access",
"urllib3",
"asyncio",
"chardet",
"requests.packages.urllib3",
]:
logging.getLogger(noisy_logger).setLevel(logging.CRITICAL)
logging.getLogger(noisy_logger).propagate = False
# Also suppress root logger to CRITICAL for anything not our own
logging.getLogger().setLevel(logging.CRITICAL)
async def fetch_srutil_cover(
sr, artist, song, session: aiohttp.ClientSession, limiter: AsyncRateLimiter
):
try:
album = await sr.get_album_by_name(artist, song)
if not album or not album.get("id"):
return None
cover_url = await sr.get_cover_by_album_id(album["id"], 640)
if cover_url:
await limiter.acquire()
try:
timeout = aiohttp.ClientTimeout(total=15)
async with session.get(cover_url, timeout=timeout) as resp:
if resp.status == 200:
return await resp.read()
else:
console.print(
f"[red]SRUtil: Failed to fetch cover art from URL (status {resp.status}): {cover_url}[/red]"
)
except Exception as e:
console.print(f"[red]SRUtil: Exception fetching cover url: {e}[/red]")
except Exception as e:
msg = str(e)
if "Cannot combine AUTHORIZATION header with AUTH argument" in msg:
console.print(
"[red]SRUtil: Skipping due to conflicting authentication method in dependency (AUTHORIZATION header + AUTH argument).[/red]"
)
else:
console.print(f"[red]SRUtil: Exception: {e}[/red]")
return None
async def get_isrc(file):
try:
def _read_isrc():
f = load_file(file)
# music_tag may store ISRC under 'isrc' or 'ISRC'
try:
val = f["isrc"].value
except Exception:
try:
val = f["ISRC"].value
except Exception:
val = None
if isinstance(val, list):
return val[0] if val else None
return val
return await asyncio.to_thread(_read_isrc)
except Exception as e:
console.print(f"[red]Error reading ISRC for {file}: {e}[/red]")
return None
async def search_musicbrainz_by_isrc(session, isrc, limiter: AsyncRateLimiter):
if not isrc:
return None
headers = {"User-Agent": "cover-art-script/1.0"}
# Use the ISRC lookup endpoint which returns recordings
url = f"https://musicbrainz.org/ws/2/isrc/{isrc}?fmt=json"
try:
await limiter.acquire()
timeout = aiohttp.ClientTimeout(total=15)
async with session.get(url, headers=headers, timeout=timeout) as resp:
if resp.status != 200:
return None
try:
data = await resp.json()
except Exception:
return None
recordings = data.get("recordings") or []
for rec in recordings:
# try releases tied to this recording
releases = rec.get("releases") or []
if releases:
relid = releases[0].get("id")
if relid:
caa_url = (
f"https://coverartarchive.org/release/{relid}/front-500"
)
async with session.get(caa_url, timeout=timeout) as caa_resp:
if caa_resp.status == 200:
console.print(
f"[green]Found cover art via ISRC {isrc}[/green]"
)
return await caa_resp.read()
return None
except Exception as e:
console.print(f"[red]MusicBrainz ISRC lookup exception for {isrc}: {e}[/red]")
return None
# Concurrency limit for async processing
CONCURRENCY = 18
# Helper for formatting failure reasons in a consistent way
def format_failure_reason(e, resp_status=None):
"""Format a failure reason from an exception or response status"""
if isinstance(e, asyncio.TimeoutError):
return "timeout"
elif isinstance(e, aiohttp.ClientError):
return f"network error: {str(e)}"
elif resp_status:
return f"HTTP {resp_status}"
elif e:
return str(e)
return "no match found"
async def process_file(
file, sr, table, results, sem, session: aiohttp.ClientSession, limiters: dict
):
"""Process a single audio file to find and embed cover art."""
async with sem:
if await has_cover(file):
table.add_row(file, "Already Present", "-")
results.append([file, "Already Present", "-"])
return
artist, album, title = await get_artist_album_title(file)
album_key = (artist, album)
image_bytes = ALBUM_ART_CACHE.get(album_key)
source = None
status = "Failed"
# Try ISRC-based lookup first
isrc = await get_isrc(file)
if isrc:
img = await search_musicbrainz_by_isrc(
session, isrc, limiters["musicbrainz"]
)
if img:
image_bytes = img
source = f"MusicBrainz (ISRC:{isrc})"
status = "Success"
ALBUM_ART_CACHE[album_key] = image_bytes
_log_attempt(artist, album, title, source, "Success")
else:
_log_attempt(
artist, album, title, f"MusicBrainz (ISRC:{isrc})", "No match"
)
# If ONLY_SPOTIFY testing mode is enabled, attempt only Spotify and return
if ONLY_SPOTIFY:
img = await search_spotify_cover(
session, artist, album, limiters["spotify"], isrc
)
if img:
image_bytes = img
source = "Spotify"
status = "Success"
ALBUM_ART_CACHE[album_key] = image_bytes
_log_attempt(artist, album, title, "Spotify", "Success")
file_basename = os.path.basename(file)
ok = await embed_cover(file, image_bytes)
if ok:
console.print(
f"[green][FINAL RESULT] {file_basename}{artist} / {album} | Success via {source}[/green]"
)
else:
status = "Embed Failed"
console.print(
f"[red][FINAL RESULT] {file_basename}{artist} / {album} | Embed Failed from {source}[/red]"
)
else:
_log_attempt(artist, album, title, "Spotify", "No match")
console.print(
f"[yellow][FINAL RESULT] {os.path.basename(file)}{artist} / {album} | No Spotify cover art found[/yellow]"
)
table.add_row(file, status, source if source else "-")
results.append([file, status, source if source else "-"])
return
# SRUtil
if not image_bytes:
img = await fetch_srutil_cover(
sr, artist, album, session, limiters["srutil"]
)
if img:
image_bytes = img
source = "SRUtil"
status = "Success"
ALBUM_ART_CACHE[album_key] = image_bytes
_log_attempt(artist, album, title, "SRUtil", "Success")
else:
_log_attempt(artist, album, title, "SRUtil", "No match")
# MusicBrainz
if not image_bytes:
img = await search_musicbrainz_cover(
artist, album, session, limiters["musicbrainz"]
)
if img:
image_bytes = img
source = "MusicBrainz"
status = "Success"
ALBUM_ART_CACHE[album_key] = image_bytes
_log_attempt(artist, album, title, "MusicBrainz", "Success")
else:
_log_attempt(artist, album, title, "MusicBrainz", "No match")
# Discogs
if not image_bytes:
img = await search_discogs_cover(
artist, album, session, limiters["discogs"]
)
if img:
image_bytes = img
source = "Discogs"
status = "Success"
ALBUM_ART_CACHE[album_key] = image_bytes
_log_attempt(artist, album, title, "Discogs", "Success")
else:
_log_attempt(artist, album, title, "Discogs", "No match")
# Deezer
if not image_bytes:
img = await search_deezer_cover(session, artist, album, limiters["deezer"])
if img:
image_bytes = img
source = "Deezer"
status = "Success"
ALBUM_ART_CACHE[album_key] = image_bytes
_log_attempt(artist, album, title, "Deezer", "Success")
else:
_log_attempt(artist, album, title, "Deezer", "No match")
# Spotify
if not image_bytes:
img = await search_spotify_cover(
session, artist, album, limiters["spotify"], isrc
)
if img:
image_bytes = img
source = "Spotify"
status = "Success"
ALBUM_ART_CACHE[album_key] = image_bytes
_log_attempt(artist, album, title, "Spotify", "Success")
else:
_log_attempt(artist, album, title, "Spotify", "No match")
# iTunes album
if not image_bytes:
img = await search_itunes_cover(session, artist, album, limiters["itunes"])
if img:
image_bytes = img
source = "iTunes(album)"
status = "Success"
ALBUM_ART_CACHE[album_key] = image_bytes
_log_attempt(artist, album, title, "iTunes(album)", "Success")
else:
_log_attempt(artist, album, title, "iTunes(album)", "No match")
# iTunes track
if not image_bytes:
img = await search_itunes_track(session, artist, title, limiters["itunes"])
if img:
image_bytes = img
source = "iTunes(track)"
status = "Success"
ALBUM_ART_CACHE[album_key] = image_bytes
_log_attempt(artist, album, title, "iTunes(track)", "Success")
else:
_log_attempt(artist, album, title, "iTunes(track)", "No match")
# Last.fm
if not image_bytes:
img = await search_lastfm_cover(session, artist, album, limiters["lastfm"])
if img:
image_bytes = img
source = "LastFM"
status = "Success"
ALBUM_ART_CACHE[album_key] = image_bytes
_log_attempt(artist, album, title, "LastFM", "Success")
else:
_log_attempt(artist, album, title, "LastFM", "No match")
# Embed and summary
file_basename = os.path.basename(file)
if image_bytes and source:
ok = await embed_cover(file, image_bytes)
if ok:
console.print(
f"[green][FINAL RESULT] {file_basename}{artist} / {album} | Success via {source}[/green]"
)
else:
status = "Embed Failed"
console.print(
f"[red][FINAL RESULT] {file_basename}{artist} / {album} | Embed Failed from {source}[/red]"
)
else:
console.print(
f"[yellow][FINAL RESULT] {file_basename}{artist} / {album} | No cover art found[/yellow]"
)
table.add_row(file, status, source if source else "-")
results.append([file, status, source if source else "-"])
async def has_cover(file):
# Check if the audio file already has embedded cover art
try:
f = load_file(file)
# music_tag stores artwork in 'artwork' which may be a list-like field
art = f["artwork"]
# If there is any artwork, consider it present
try:
return bool(art.first)
except Exception:
# fallback if .first not available
return bool(art)
except Exception:
return False
async def get_artist_album_title(file):
# Extract artist, album, and title from audio file tags
try:
f = load_file(file)
artist = str(f["artist"].first) if f["artist"].first else "Unknown Artist"
album = str(f["album"].first) if f["album"].first else "Unknown Album"
title = str(f["title"].first) if f["title"].first else "Unknown Title"
return artist, album, title
except Exception:
return "Unknown Artist", "Unknown Album", "Unknown Title"
async def embed_cover(file, image_bytes):
# Embed cover art into audio file metadata using music_tag
try:
f = load_file(file)
f["artwork"] = image_bytes
f.save()
return True
except Exception as e:
console.print(f"[red][ERROR] Failed to embed cover: {e}[/red]")
return False
async def main():
try:
console.print(f"[bold blue]Scanning directory: {MUSIC_DIR}[/bold blue]")
sr = SRUtil()
results = []
files = []
for root, _, filenames in os.walk(MUSIC_DIR):
for fn in filenames:
if os.path.splitext(fn)[1].lower() in AUDIO_EXTS:
file_path = os.path.join(root, fn)
files.append(file_path)
table = Table(title="Cover Art Embedding Report")
table.add_column("File", style="cyan", overflow="fold")
table.add_column("Status", style="green")
table.add_column("Source", style="magenta")
# create rate limiters (seconds between requests)
RATE_SRUTIL = 0.1
RATE_MUSICBRAINZ = 1.0
RATE_ITUNES = 0.5
RATE_DISCOGS = 1.0
RATE_DEEZER = 0.5
RATE_LASTFM = 1.0
RATE_SPOTIFY = 0.5
limiters = {
"srutil": AsyncRateLimiter(RATE_SRUTIL),
"musicbrainz": AsyncRateLimiter(RATE_MUSICBRAINZ),
"itunes": AsyncRateLimiter(RATE_ITUNES),
"discogs": AsyncRateLimiter(RATE_DISCOGS),
"deezer": AsyncRateLimiter(RATE_DEEZER),
"lastfm": AsyncRateLimiter(RATE_LASTFM),
"spotify": AsyncRateLimiter(RATE_SPOTIFY),
}
sem = asyncio.Semaphore(CONCURRENCY)
def format_failure_reason(e, resp_status=None):
"""Format a failure reason from an exception or response status"""
if isinstance(e, asyncio.TimeoutError):
return "timeout"
elif isinstance(e, aiohttp.ClientError):
return f"network error: {str(e)}"
elif resp_status:
return f"HTTP {resp_status}"
elif e:
return str(e)
return "no match found"
async def worker(
file, sr, table, results, sem, progress, task_id, session, limiters
):
await process_file(file, sr, table, results, sem, session, limiters)
progress.update(task_id, advance=1)
async with aiohttp.ClientSession() as session:
with Progress(
TextColumn("[progress.description]{task.description}"),
BarColumn(),
TaskProgressColumn(),
TimeElapsedColumn(),
) as progress:
task_id = progress.add_task("Processing files...", total=len(files))
# Schedule all workers
await asyncio.gather(
*(
worker(
file,
sr,
table,
results,
sem,
progress,
task_id,
session,
limiters,
)
for file in files
)
)
# Print summary table and CSV after progress bar
console.print(table)
with open(REPORT_CSV, "w", newline="") as f:
writer = csv.writer(f)
writer.writerow(["File", "Status", "Source"])
writer.writerows(results)
console.print(f"[bold green]CSV report written to {REPORT_CSV}[/bold green]")
except Exception as e:
console.print(f"[red][ERROR] Unhandled exception: {e}[/red]")
traceback.print_exc()
sys.exit(1)
if __name__ == "__main__":
asyncio.run(main())