247 lines
10 KiB
Python
247 lines
10 KiB
Python
from aiohttp import ClientSession, ClientTimeout
|
|
from typing import Optional
|
|
from urllib.parse import urlencode, quote
|
|
import logging
|
|
|
|
class HifiUtil:
|
|
"""
|
|
HiFi API Utility Class
|
|
"""
|
|
def __init__(self) -> None:
|
|
"""Initialize HiFi API utility with base URLs."""
|
|
self.hifi_api_url: str = 'http://127.0.0.1:8000'
|
|
self.hifi_search_url: str = f"{self.hifi_api_url}/search"
|
|
|
|
def dedupe_by_key(self,
|
|
key: str,
|
|
entries: list[dict]) -> list[dict]:
|
|
deduped = {}
|
|
for entry in entries:
|
|
norm = entry[key].strip().lower()
|
|
if norm not in deduped:
|
|
deduped[norm] = entry
|
|
return list(deduped.values())
|
|
|
|
def format_duration(self, seconds):
|
|
if not seconds:
|
|
return None
|
|
m, s = divmod(seconds, 60)
|
|
return f"{m}:{s:02}"
|
|
|
|
async def search(self,
|
|
artist: str,
|
|
song: str = "",
|
|
album: str = "",
|
|
video_name: str = "",
|
|
playlist_name: str = "") -> Optional[list[dict]]:
|
|
"""Search HiFi API
|
|
Args:
|
|
artist (str, required)
|
|
song (str, optional)
|
|
album (str, optional)
|
|
video_name (str, optional)
|
|
playlist_name (str, optional)
|
|
Returns:
|
|
Optional[dict]: Returns the first result from the HiFi API search or None if no results found.
|
|
"""
|
|
async with ClientSession(timeout=ClientTimeout(total=30)) as session:
|
|
params: dict = {
|
|
"a": artist,
|
|
"s": song,
|
|
"al": album,
|
|
"v": video_name,
|
|
"p": playlist_name,
|
|
}
|
|
query: str = urlencode(params, quote_via=quote)
|
|
built_url: str = f"{self.hifi_search_url}?{query}"
|
|
async with session.get(built_url) as response:
|
|
json_response: dict = await response.json()
|
|
if isinstance(json_response, list):
|
|
json_response = json_response[0]
|
|
key = next(iter(json_response), None)
|
|
if not key:
|
|
logging.error("No matching key found in JSON response.")
|
|
return None
|
|
logging.info("Key: %s", key)
|
|
if key not in ["limit"]:
|
|
json_response = json_response[key]
|
|
items: list[dict] = json_response.get("items", [])
|
|
if not items:
|
|
logging.info("No results found.")
|
|
return None
|
|
return items
|
|
|
|
async def _retrieve(self,
|
|
req_type: str,
|
|
id: int,
|
|
quality: str = "LOSSLESS") -> Optional[list|dict]:
|
|
"""Retrieve a specific item by type and ID from the HiFi API.
|
|
Args:
|
|
type (str): The type of item (e.g., 'song', 'album').
|
|
id (int): The ID of the item.
|
|
quality (str): The quality of the item, default is "LOSSLESS". Other options: HIGH, LOW
|
|
Returns:
|
|
Optional[dict]: The item details or None if not found.
|
|
"""
|
|
async with ClientSession(timeout=ClientTimeout(total=10)) as session:
|
|
params: dict = {
|
|
'id': id,
|
|
'quality': quality,
|
|
}
|
|
if req_type not in ["track", "artist", "album", "playlist", "video"]:
|
|
logging.error("Invalid type: %s", type)
|
|
return None
|
|
if req_type in ["artist"]:
|
|
params.pop('id')
|
|
params['f'] = id # For non-track types, use 'f' instead of 'id' for full API output
|
|
query: str = urlencode(params, quote_via=quote)
|
|
built_url: str = f"{self.hifi_api_url}/{req_type}/?{query}"
|
|
logging.info("Built URL: %s", built_url)
|
|
async with session.get(built_url) as response:
|
|
if response.status != 200:
|
|
logging.warning("Item not found: %s %s", req_type, id)
|
|
return None
|
|
response_json: list|dict = await response.json()
|
|
match req_type:
|
|
case "artist":
|
|
response_json = response_json[0]
|
|
if not isinstance(response_json, dict):
|
|
logging.error("Expected a dict but got: %s", type(response_json))
|
|
return None
|
|
response_json_rows = response_json.get('rows')
|
|
if not isinstance(response_json_rows, list):
|
|
logging.error("Expected a list but got: %s", type(response_json_rows))
|
|
return None
|
|
response_json = response_json_rows[0].get('modules')[0].get('pagedList')
|
|
case "album":
|
|
return response_json[1].get('items', [])
|
|
case "track":
|
|
return response_json
|
|
if not isinstance(response_json, dict):
|
|
logging.error("Expected a list but got: %s", type(response_json))
|
|
return None
|
|
return response_json.get('items')
|
|
|
|
async def get_artists_by_name(self,
|
|
artist_name: str) -> Optional[list]:
|
|
"""Get artist(s) by name from HiFi API.
|
|
Args:
|
|
artist_name (str): The name of the artist.
|
|
Returns:
|
|
Optional[dict]: The artist details or None if not found.
|
|
"""
|
|
artists_out: list[dict] = []
|
|
artists = await self.search(artist=artist_name)
|
|
if not artists:
|
|
logging.warning("No artist found for name: %s", artist_name)
|
|
return None
|
|
artists_out = [
|
|
{
|
|
'artist': res['name'],
|
|
'id': res['id'],
|
|
} for res in artists if 'name' in res and 'id' in res
|
|
]
|
|
artists_out = self.dedupe_by_key('artist', artists_out) # Remove duplicates
|
|
return artists_out
|
|
|
|
async def get_albums_by_artist_id(self,
|
|
artist_id: int) -> Optional[list|dict]:
|
|
"""Get albums by artist ID from HiFi API.
|
|
Args:
|
|
artist_id (int): The ID of the artist.
|
|
Returns:
|
|
Optional[list[dict]]: List of albums or None if not found.
|
|
"""
|
|
albums_out: list[dict] = []
|
|
albums = await self._retrieve("artist", artist_id)
|
|
if not albums:
|
|
logging.warning("No albums found for artist ID: %s", artist_id)
|
|
return None
|
|
albums_out = [
|
|
{
|
|
'artist': ", ".join(artist['name'] for artist in album['artists']),
|
|
'album': album['title'],
|
|
'id': album['id'],
|
|
'release_date': album.get('releaseDate', 'Unknown')
|
|
} for album in albums if 'title' in album and 'id' in album and 'artists' in album
|
|
]
|
|
|
|
logging.info("Retrieved albums: %s", albums_out)
|
|
return albums_out
|
|
|
|
async def get_tracks_by_album_id(self,
|
|
album_id: int) -> Optional[list|dict]:
|
|
"""Get tracks by album ID from HiFi API.
|
|
Args:
|
|
album_id (int): The ID of the album.
|
|
Returns:
|
|
Optional[list[dict]]: List of tracks or None if not found.
|
|
"""
|
|
track_list = await self._retrieve("album", album_id)
|
|
if not track_list:
|
|
logging.warning("No tracks found for album ID: %s", album_id)
|
|
return None
|
|
tracks_out: list[dict] = [
|
|
{
|
|
'id': track.get('item').get('id'),
|
|
'artist': track.get('item').get('artist').get('name'),
|
|
'title': track.get('item').get('title'),
|
|
'duration': self.format_duration(track.get('item').get('duration', 0)),
|
|
'version': track.get('item').get('version'),
|
|
'audioQuality': track.get('item').get('audioQuality'),
|
|
} for track in track_list
|
|
]
|
|
return tracks_out
|
|
|
|
|
|
async def get_tracks_by_artist_song(self,
|
|
artist: str,
|
|
song: str) -> Optional[list]:
|
|
"""Get track by artist and song name from HiFi API.
|
|
Args:
|
|
artist (str): The name of the artist.
|
|
song (str): The name of the song.
|
|
Returns:
|
|
Optional[dict]: The track details or None if not found.
|
|
"""
|
|
tracks_out: list[dict] = []
|
|
tracks = await self.search(artist=artist, song=song)
|
|
if not tracks:
|
|
logging.warning("No track found for artist: %s, song: %s", artist, song)
|
|
return None
|
|
tracks_out = [
|
|
{
|
|
'artist': ", ".join(artist['name'] for artist in track['artists']),
|
|
'song': track['title'],
|
|
'id': track['id'],
|
|
'album': track.get('album', {}).get('title', 'Unknown'),
|
|
'duration': track.get('duration', 0)
|
|
} for track in tracks if 'title' in track and 'id' in track and 'artists' in track
|
|
]
|
|
if not tracks_out:
|
|
logging.warning("No valid tracks found after processing.")
|
|
return None
|
|
logging.info("Retrieved tracks: %s", tracks_out)
|
|
return tracks_out
|
|
|
|
async def get_stream_url_by_track_id(self,
|
|
track_id: int,
|
|
quality: str = "LOSSLESS") -> Optional[str]:
|
|
"""Get stream URL by track ID from HiFi API.
|
|
Args:
|
|
track_id (int): The ID of the track.
|
|
quality (str): The quality of the stream, default is "LOSSLESS". Other options: HIGH, LOW
|
|
Returns:
|
|
Optional[str]: The stream URL or None if not found.
|
|
"""
|
|
track = await self._retrieve("track", track_id, quality)
|
|
if not isinstance(track, list) :
|
|
logging.warning("No track found for ID: %s", track_id)
|
|
return None
|
|
stream_url = track[2].get('OriginalTrackUrl')
|
|
if not stream_url:
|
|
logging.warning("No stream URL found for track ID: %s", track_id)
|
|
return None
|
|
logging.info("Retrieved stream URL: %s", stream_url)
|
|
return stream_url
|
|
|