formatting / CORS changes
This commit is contained in:
4
.gitignore
vendored
4
.gitignore
vendored
@@ -21,5 +21,9 @@ get_next_track.py
|
||||
endpoints/radio.py
|
||||
utils/radio_util.py
|
||||
redis_playlist.py
|
||||
endpoints/auth.py
|
||||
endpoints/radio2
|
||||
endpoints/radio2/**
|
||||
hash_password.py
|
||||
**/auth/*
|
||||
.gitignore
|
||||
|
8
base.py
8
base.py
@@ -38,10 +38,11 @@ app.add_middleware(
|
||||
CORSMiddleware, # type: ignore
|
||||
allow_origins=origins,
|
||||
allow_credentials=True,
|
||||
allow_methods=["POST", "GET", "HEAD"],
|
||||
allow_methods=["POST", "GET", "HEAD", "OPTIONS"],
|
||||
allow_headers=["*"],
|
||||
) # type: ignore
|
||||
|
||||
|
||||
"""
|
||||
Blacklisted routes
|
||||
"""
|
||||
@@ -98,9 +99,8 @@ routes: dict = {
|
||||
app, util, constants, loop
|
||||
),
|
||||
"meme": importlib.import_module("endpoints.meme").Meme(app, util, constants),
|
||||
"trip": importlib.import_module("endpoints.rip").RIP(
|
||||
app, util, constants
|
||||
),
|
||||
"trip": importlib.import_module("endpoints.rip").RIP(app, util, constants),
|
||||
"auth": importlib.import_module("endpoints.auth").Auth(app),
|
||||
}
|
||||
|
||||
# Misc endpoint depends on radio endpoint instance
|
||||
|
@@ -23,7 +23,9 @@ class Meme(FastAPI):
|
||||
for endpoint, handler in self.endpoints.items():
|
||||
dependencies = None
|
||||
if endpoint == "memes/list_memes":
|
||||
dependencies = [Depends(RateLimiter(times=10, seconds=2))] # Do not rate limit image retrievals (cached)
|
||||
dependencies = [
|
||||
Depends(RateLimiter(times=10, seconds=2))
|
||||
] # Do not rate limit image retrievals (cached)
|
||||
app.add_api_route(
|
||||
f"/{endpoint}",
|
||||
handler,
|
||||
|
@@ -3,9 +3,11 @@ from fastapi import FastAPI, Request, Response, Depends
|
||||
from fastapi_throttle import RateLimiter
|
||||
from fastapi.responses import JSONResponse
|
||||
from utils.hifi_wrapper import HifiUtil
|
||||
from auth.deps import get_current_user
|
||||
|
||||
logging.getLogger().setLevel(logging.INFO)
|
||||
|
||||
|
||||
class RIP(FastAPI):
|
||||
"""
|
||||
Ripping Endpoints
|
||||
@@ -25,7 +27,9 @@ class RIP(FastAPI):
|
||||
}
|
||||
|
||||
for endpoint, handler in self.endpoints.items():
|
||||
dependencies = [Depends(RateLimiter(times=8, seconds=2))] # Do not rate limit image retrievals (cached)
|
||||
dependencies = [
|
||||
Depends(RateLimiter(times=8, seconds=2))
|
||||
] # Do not rate limit image retrievals (cached)
|
||||
app.add_api_route(
|
||||
f"/{endpoint}",
|
||||
handler,
|
||||
@@ -34,28 +38,36 @@ class RIP(FastAPI):
|
||||
dependencies=dependencies,
|
||||
)
|
||||
|
||||
async def artists_by_name_handler(self, artist: str, request: Request) -> Response:
|
||||
async def artists_by_name_handler(
|
||||
self, artist: str, request: Request, user=Depends(get_current_user)
|
||||
) -> Response:
|
||||
"""Get artists by name"""
|
||||
artists = await self.trip_util.get_artists_by_name(artist)
|
||||
if not artists:
|
||||
return Response(status_code=404, content="Not found")
|
||||
return JSONResponse(content=artists)
|
||||
|
||||
async def albums_by_artist_id_handler(self, artist_id: int, request: Request) -> Response:
|
||||
async def albums_by_artist_id_handler(
|
||||
self, artist_id: int, request: Request, user=Depends(get_current_user)
|
||||
) -> Response:
|
||||
"""Get albums by artist ID"""
|
||||
albums = await self.trip_util.get_albums_by_artist_id(artist_id)
|
||||
if not albums:
|
||||
return Response(status_code=404, content="Not found")
|
||||
return JSONResponse(content=albums)
|
||||
|
||||
async def tracks_by_album_id_handler(self, album_id: int, request: Request) -> Response:
|
||||
async def tracks_by_album_id_handler(
|
||||
self, album_id: int, request: Request, user=Depends(get_current_user)
|
||||
) -> Response:
|
||||
"""Get tracks by album id"""
|
||||
tracks = await self.trip_util.get_tracks_by_album_id(album_id)
|
||||
if not tracks:
|
||||
return Response(status_code=404, content="Not Found")
|
||||
return JSONResponse(content=tracks)
|
||||
|
||||
async def tracks_by_artist_song_handler(self, artist: str, song: str, request: Request) -> Response:
|
||||
async def tracks_by_artist_song_handler(
|
||||
self, artist: str, song: str, request: Request, user=Depends(get_current_user)
|
||||
) -> Response:
|
||||
"""Get tracks by artist and song name"""
|
||||
logging.critical("Searching for tracks by artist: %s, song: %s", artist, song)
|
||||
tracks = await self.trip_util.get_tracks_by_artist_song(artist, song)
|
||||
@@ -63,7 +75,9 @@ class RIP(FastAPI):
|
||||
return Response(status_code=404, content="Not found")
|
||||
return JSONResponse(content=tracks)
|
||||
|
||||
async def track_by_id_handler(self, track_id: int, request: Request) -> Response:
|
||||
async def track_by_id_handler(
|
||||
self, track_id: int, request: Request, user=Depends(get_current_user)
|
||||
) -> Response:
|
||||
"""Get track by ID"""
|
||||
track = await self.trip_util.get_stream_url_by_track_id(track_id)
|
||||
if not track:
|
||||
|
@@ -111,7 +111,8 @@ class DataUtils:
|
||||
"""
|
||||
|
||||
def __init__(self) -> None:
|
||||
self.lrc_regex = regex.compile( # capture mm:ss and optional .xxx, then the lyric text
|
||||
self.lrc_regex = (
|
||||
regex.compile( # capture mm:ss and optional .xxx, then the lyric text
|
||||
r"""
|
||||
\[ # literal “[”
|
||||
( # 1st (and only) capture group:
|
||||
@@ -125,6 +126,7 @@ class DataUtils:
|
||||
""",
|
||||
regex.VERBOSE,
|
||||
)
|
||||
)
|
||||
self.scrub_regex_1: Pattern = regex.compile(r"(\[.*?\])(\s){0,}(\:){0,1}")
|
||||
self.scrub_regex_2: Pattern = regex.compile(
|
||||
r"(\d?)(Embed\b)", flags=regex.IGNORECASE
|
||||
|
@@ -3,18 +3,18 @@ from typing import Optional
|
||||
from urllib.parse import urlencode, quote
|
||||
import logging
|
||||
|
||||
|
||||
class HifiUtil:
|
||||
"""
|
||||
HiFi API Utility Class
|
||||
"""
|
||||
|
||||
def __init__(self) -> None:
|
||||
"""Initialize HiFi API utility with base URLs."""
|
||||
self.hifi_api_url: str = 'http://127.0.0.1:8000'
|
||||
self.hifi_api_url: str = "http://127.0.0.1:8000"
|
||||
self.hifi_search_url: str = f"{self.hifi_api_url}/search"
|
||||
|
||||
def dedupe_by_key(self,
|
||||
key: str,
|
||||
entries: list[dict]) -> list[dict]:
|
||||
def dedupe_by_key(self, key: str, entries: list[dict]) -> list[dict]:
|
||||
deduped = {}
|
||||
for entry in entries:
|
||||
norm = entry[key].strip().lower()
|
||||
@@ -28,12 +28,14 @@ class HifiUtil:
|
||||
m, s = divmod(seconds, 60)
|
||||
return f"{m}:{s:02}"
|
||||
|
||||
async def search(self,
|
||||
async def search(
|
||||
self,
|
||||
artist: str,
|
||||
song: str = "",
|
||||
album: str = "",
|
||||
video_name: str = "",
|
||||
playlist_name: str = "") -> Optional[list[dict]]:
|
||||
playlist_name: str = "",
|
||||
) -> Optional[list[dict]]:
|
||||
"""Search HiFi API
|
||||
Args:
|
||||
artist (str, required)
|
||||
@@ -71,10 +73,9 @@ class HifiUtil:
|
||||
return None
|
||||
return items
|
||||
|
||||
async def _retrieve(self,
|
||||
req_type: str,
|
||||
id: int,
|
||||
quality: str = "LOSSLESS") -> Optional[list|dict]:
|
||||
async def _retrieve(
|
||||
self, req_type: str, id: int, quality: str = "LOSSLESS"
|
||||
) -> Optional[list | dict]:
|
||||
"""Retrieve a specific item by type and ID from the HiFi API.
|
||||
Args:
|
||||
type (str): The type of item (e.g., 'song', 'album').
|
||||
@@ -85,15 +86,17 @@ class HifiUtil:
|
||||
"""
|
||||
async with ClientSession(timeout=ClientTimeout(total=10)) as session:
|
||||
params: dict = {
|
||||
'id': id,
|
||||
'quality': quality,
|
||||
"id": id,
|
||||
"quality": quality,
|
||||
}
|
||||
if req_type not in ["track", "artist", "album", "playlist", "video"]:
|
||||
logging.error("Invalid type: %s", type)
|
||||
return None
|
||||
if req_type in ["artist"]:
|
||||
params.pop('id')
|
||||
params['f'] = id # For non-track types, use 'f' instead of 'id' for full API output
|
||||
params.pop("id")
|
||||
params["f"] = (
|
||||
id # For non-track types, use 'f' instead of 'id' for full API output
|
||||
)
|
||||
query: str = urlencode(params, quote_via=quote)
|
||||
built_url: str = f"{self.hifi_api_url}/{req_type}/?{query}"
|
||||
logging.info("Built URL: %s", built_url)
|
||||
@@ -106,24 +109,29 @@ class HifiUtil:
|
||||
case "artist":
|
||||
response_json = response_json[0]
|
||||
if not isinstance(response_json, dict):
|
||||
logging.error("Expected a dict but got: %s", type(response_json))
|
||||
logging.error(
|
||||
"Expected a dict but got: %s", type(response_json)
|
||||
)
|
||||
return None
|
||||
response_json_rows = response_json.get('rows')
|
||||
response_json_rows = response_json.get("rows")
|
||||
if not isinstance(response_json_rows, list):
|
||||
logging.error("Expected a list but got: %s", type(response_json_rows))
|
||||
logging.error(
|
||||
"Expected a list but got: %s", type(response_json_rows)
|
||||
)
|
||||
return None
|
||||
response_json = response_json_rows[0].get('modules')[0].get('pagedList')
|
||||
response_json = (
|
||||
response_json_rows[0].get("modules")[0].get("pagedList")
|
||||
)
|
||||
case "album":
|
||||
return response_json[1].get('items', [])
|
||||
return response_json[1].get("items", [])
|
||||
case "track":
|
||||
return response_json
|
||||
if not isinstance(response_json, dict):
|
||||
logging.error("Expected a list but got: %s", type(response_json))
|
||||
return None
|
||||
return response_json.get('items')
|
||||
return response_json.get("items")
|
||||
|
||||
async def get_artists_by_name(self,
|
||||
artist_name: str) -> Optional[list]:
|
||||
async def get_artists_by_name(self, artist_name: str) -> Optional[list]:
|
||||
"""Get artist(s) by name from HiFi API.
|
||||
Args:
|
||||
artist_name (str): The name of the artist.
|
||||
@@ -137,15 +145,16 @@ class HifiUtil:
|
||||
return None
|
||||
artists_out = [
|
||||
{
|
||||
'artist': res['name'],
|
||||
'id': res['id'],
|
||||
} for res in artists if 'name' in res and 'id' in res
|
||||
"artist": res["name"],
|
||||
"id": res["id"],
|
||||
}
|
||||
for res in artists
|
||||
if "name" in res and "id" in res
|
||||
]
|
||||
artists_out = self.dedupe_by_key('artist', artists_out) # Remove duplicates
|
||||
artists_out = self.dedupe_by_key("artist", artists_out) # Remove duplicates
|
||||
return artists_out
|
||||
|
||||
async def get_albums_by_artist_id(self,
|
||||
artist_id: int) -> Optional[list|dict]:
|
||||
async def get_albums_by_artist_id(self, artist_id: int) -> Optional[list | dict]:
|
||||
"""Get albums by artist ID from HiFi API.
|
||||
Args:
|
||||
artist_id (int): The ID of the artist.
|
||||
@@ -159,18 +168,19 @@ class HifiUtil:
|
||||
return None
|
||||
albums_out = [
|
||||
{
|
||||
'artist': ", ".join(artist['name'] for artist in album['artists']),
|
||||
'album': album['title'],
|
||||
'id': album['id'],
|
||||
'release_date': album.get('releaseDate', 'Unknown')
|
||||
} for album in albums if 'title' in album and 'id' in album and 'artists' in album
|
||||
"artist": ", ".join(artist["name"] for artist in album["artists"]),
|
||||
"album": album["title"],
|
||||
"id": album["id"],
|
||||
"release_date": album.get("releaseDate", "Unknown"),
|
||||
}
|
||||
for album in albums
|
||||
if "title" in album and "id" in album and "artists" in album
|
||||
]
|
||||
|
||||
logging.info("Retrieved albums: %s", albums_out)
|
||||
return albums_out
|
||||
|
||||
async def get_tracks_by_album_id(self,
|
||||
album_id: int) -> Optional[list|dict]:
|
||||
async def get_tracks_by_album_id(self, album_id: int) -> Optional[list | dict]:
|
||||
"""Get tracks by album ID from HiFi API.
|
||||
Args:
|
||||
album_id (int): The ID of the album.
|
||||
@@ -183,20 +193,18 @@ class HifiUtil:
|
||||
return None
|
||||
tracks_out: list[dict] = [
|
||||
{
|
||||
'id': track.get('item').get('id'),
|
||||
'artist': track.get('item').get('artist').get('name'),
|
||||
'title': track.get('item').get('title'),
|
||||
'duration': self.format_duration(track.get('item').get('duration', 0)),
|
||||
'version': track.get('item').get('version'),
|
||||
'audioQuality': track.get('item').get('audioQuality'),
|
||||
} for track in track_list
|
||||
"id": track.get("item").get("id"),
|
||||
"artist": track.get("item").get("artist").get("name"),
|
||||
"title": track.get("item").get("title"),
|
||||
"duration": self.format_duration(track.get("item").get("duration", 0)),
|
||||
"version": track.get("item").get("version"),
|
||||
"audioQuality": track.get("item").get("audioQuality"),
|
||||
}
|
||||
for track in track_list
|
||||
]
|
||||
return tracks_out
|
||||
|
||||
|
||||
async def get_tracks_by_artist_song(self,
|
||||
artist: str,
|
||||
song: str) -> Optional[list]:
|
||||
async def get_tracks_by_artist_song(self, artist: str, song: str) -> Optional[list]:
|
||||
"""Get track by artist and song name from HiFi API.
|
||||
Args:
|
||||
artist (str): The name of the artist.
|
||||
@@ -211,12 +219,14 @@ class HifiUtil:
|
||||
return None
|
||||
tracks_out = [
|
||||
{
|
||||
'artist': ", ".join(artist['name'] for artist in track['artists']),
|
||||
'song': track['title'],
|
||||
'id': track['id'],
|
||||
'album': track.get('album', {}).get('title', 'Unknown'),
|
||||
'duration': track.get('duration', 0)
|
||||
} for track in tracks if 'title' in track and 'id' in track and 'artists' in track
|
||||
"artist": ", ".join(artist["name"] for artist in track["artists"]),
|
||||
"song": track["title"],
|
||||
"id": track["id"],
|
||||
"album": track.get("album", {}).get("title", "Unknown"),
|
||||
"duration": track.get("duration", 0),
|
||||
}
|
||||
for track in tracks
|
||||
if "title" in track and "id" in track and "artists" in track
|
||||
]
|
||||
if not tracks_out:
|
||||
logging.warning("No valid tracks found after processing.")
|
||||
@@ -224,9 +234,9 @@ class HifiUtil:
|
||||
logging.info("Retrieved tracks: %s", tracks_out)
|
||||
return tracks_out
|
||||
|
||||
async def get_stream_url_by_track_id(self,
|
||||
track_id: int,
|
||||
quality: str = "LOSSLESS") -> Optional[str]:
|
||||
async def get_stream_url_by_track_id(
|
||||
self, track_id: int, quality: str = "LOSSLESS"
|
||||
) -> Optional[str]:
|
||||
"""Get stream URL by track ID from HiFi API.
|
||||
Args:
|
||||
track_id (int): The ID of the track.
|
||||
@@ -238,10 +248,9 @@ class HifiUtil:
|
||||
if not isinstance(track, list):
|
||||
logging.warning("No track found for ID: %s", track_id)
|
||||
return None
|
||||
stream_url = track[2].get('OriginalTrackUrl')
|
||||
stream_url = track[2].get("OriginalTrackUrl")
|
||||
if not stream_url:
|
||||
logging.warning("No stream URL found for track ID: %s", track_id)
|
||||
return None
|
||||
logging.info("Retrieved stream URL: %s", stream_url)
|
||||
return stream_url
|
||||
|
Reference in New Issue
Block a user