Files
api/utils/hifi_wrapper.py
2025-08-09 07:48:07 -04:00

257 lines
10 KiB
Python

from aiohttp import ClientSession, ClientTimeout
from typing import Optional
from urllib.parse import urlencode, quote
import logging
class HifiUtil:
"""
HiFi API Utility Class
"""
def __init__(self) -> None:
"""Initialize HiFi API utility with base URLs."""
self.hifi_api_url: str = "http://127.0.0.1:8000"
self.hifi_search_url: str = f"{self.hifi_api_url}/search"
def dedupe_by_key(self, key: str, entries: list[dict]) -> list[dict]:
deduped = {}
for entry in entries:
norm = entry[key].strip().lower()
if norm not in deduped:
deduped[norm] = entry
return list(deduped.values())
def format_duration(self, seconds):
if not seconds:
return None
m, s = divmod(seconds, 60)
return f"{m}:{s:02}"
async def search(
self,
artist: str,
song: str = "",
album: str = "",
video_name: str = "",
playlist_name: str = "",
) -> Optional[list[dict]]:
"""Search HiFi API
Args:
artist (str, required)
song (str, optional)
album (str, optional)
video_name (str, optional)
playlist_name (str, optional)
Returns:
Optional[dict]: Returns the first result from the HiFi API search or None if no results found.
"""
async with ClientSession(timeout=ClientTimeout(total=30)) as session:
params: dict = {
"a": artist,
"s": song,
"al": album,
"v": video_name,
"p": playlist_name,
}
query: str = urlencode(params, quote_via=quote)
built_url: str = f"{self.hifi_search_url}?{query}"
async with session.get(built_url) as response:
json_response: dict = await response.json()
if isinstance(json_response, list):
json_response = json_response[0]
key = next(iter(json_response), None)
if not key:
logging.error("No matching key found in JSON response.")
return None
logging.info("Key: %s", key)
if key not in ["limit"]:
json_response = json_response[key]
items: list[dict] = json_response.get("items", [])
if not items:
logging.info("No results found.")
return None
return items
async def _retrieve(
self, req_type: str, id: int, quality: str = "LOSSLESS"
) -> Optional[list | dict]:
"""Retrieve a specific item by type and ID from the HiFi API.
Args:
type (str): The type of item (e.g., 'song', 'album').
id (int): The ID of the item.
quality (str): The quality of the item, default is "LOSSLESS". Other options: HIGH, LOW
Returns:
Optional[dict]: The item details or None if not found.
"""
async with ClientSession(timeout=ClientTimeout(total=10)) as session:
params: dict = {
"id": id,
"quality": quality,
}
if req_type not in ["track", "artist", "album", "playlist", "video"]:
logging.error("Invalid type: %s", type)
return None
if req_type in ["artist"]:
params.pop("id")
params["f"] = (
id # For non-track types, use 'f' instead of 'id' for full API output
)
query: str = urlencode(params, quote_via=quote)
built_url: str = f"{self.hifi_api_url}/{req_type}/?{query}"
logging.info("Built URL: %s", built_url)
async with session.get(built_url) as response:
if response.status != 200:
logging.warning("Item not found: %s %s", req_type, id)
return None
response_json: list | dict = await response.json()
match req_type:
case "artist":
response_json = response_json[0]
if not isinstance(response_json, dict):
logging.error(
"Expected a dict but got: %s", type(response_json)
)
return None
response_json_rows = response_json.get("rows")
if not isinstance(response_json_rows, list):
logging.error(
"Expected a list but got: %s", type(response_json_rows)
)
return None
response_json = (
response_json_rows[0].get("modules")[0].get("pagedList")
)
case "album":
return response_json[1].get("items", [])
case "track":
return response_json
if not isinstance(response_json, dict):
logging.error("Expected a list but got: %s", type(response_json))
return None
return response_json.get("items")
async def get_artists_by_name(self, artist_name: str) -> Optional[list]:
"""Get artist(s) by name from HiFi API.
Args:
artist_name (str): The name of the artist.
Returns:
Optional[dict]: The artist details or None if not found.
"""
artists_out: list[dict] = []
artists = await self.search(artist=artist_name)
if not artists:
logging.warning("No artist found for name: %s", artist_name)
return None
artists_out = [
{
"artist": res["name"],
"id": res["id"],
}
for res in artists
if "name" in res and "id" in res
]
artists_out = self.dedupe_by_key("artist", artists_out) # Remove duplicates
return artists_out
async def get_albums_by_artist_id(self, artist_id: int) -> Optional[list | dict]:
"""Get albums by artist ID from HiFi API.
Args:
artist_id (int): The ID of the artist.
Returns:
Optional[list[dict]]: List of albums or None if not found.
"""
albums_out: list[dict] = []
albums = await self._retrieve("artist", artist_id)
if not albums:
logging.warning("No albums found for artist ID: %s", artist_id)
return None
albums_out = [
{
"artist": ", ".join(artist["name"] for artist in album["artists"]),
"album": album["title"],
"id": album["id"],
"release_date": album.get("releaseDate", "Unknown"),
}
for album in albums
if "title" in album and "id" in album and "artists" in album
]
logging.info("Retrieved albums: %s", albums_out)
return albums_out
async def get_tracks_by_album_id(self, album_id: int) -> Optional[list | dict]:
"""Get tracks by album ID from HiFi API.
Args:
album_id (int): The ID of the album.
Returns:
Optional[list[dict]]: List of tracks or None if not found.
"""
track_list = await self._retrieve("album", album_id)
if not track_list:
logging.warning("No tracks found for album ID: %s", album_id)
return None
tracks_out: list[dict] = [
{
"id": track.get("item").get("id"),
"artist": track.get("item").get("artist").get("name"),
"title": track.get("item").get("title"),
"duration": self.format_duration(track.get("item").get("duration", 0)),
"version": track.get("item").get("version"),
"audioQuality": track.get("item").get("audioQuality"),
}
for track in track_list
]
return tracks_out
async def get_tracks_by_artist_song(self, artist: str, song: str) -> Optional[list]:
"""Get track by artist and song name from HiFi API.
Args:
artist (str): The name of the artist.
song (str): The name of the song.
Returns:
Optional[dict]: The track details or None if not found.
"""
tracks_out: list[dict] = []
tracks = await self.search(artist=artist, song=song)
if not tracks:
logging.warning("No track found for artist: %s, song: %s", artist, song)
return None
tracks_out = [
{
"artist": ", ".join(artist["name"] for artist in track["artists"]),
"song": track["title"],
"id": track["id"],
"album": track.get("album", {}).get("title", "Unknown"),
"duration": track.get("duration", 0),
}
for track in tracks
if "title" in track and "id" in track and "artists" in track
]
if not tracks_out:
logging.warning("No valid tracks found after processing.")
return None
logging.info("Retrieved tracks: %s", tracks_out)
return tracks_out
async def get_stream_url_by_track_id(
self, track_id: int, quality: str = "LOSSLESS"
) -> Optional[str]:
"""Get stream URL by track ID from HiFi API.
Args:
track_id (int): The ID of the track.
quality (str): The quality of the stream, default is "LOSSLESS". Other options: HIGH, LOW
Returns:
Optional[str]: The stream URL or None if not found.
"""
track = await self._retrieve("track", track_id, quality)
if not isinstance(track, list):
logging.warning("No track found for ID: %s", track_id)
return None
stream_url = track[2].get("OriginalTrackUrl")
if not stream_url:
logging.warning("No stream URL found for track ID: %s", track_id)
return None
logging.info("Retrieved stream URL: %s", stream_url)
return stream_url