#!/usr/bin/env python3.12 # pylint: disable=bare-except, broad-exception-caught, invalid-name import logging import traceback import os import aiosqlite as sqlite3 import time import asyncio import regex import music_tag from uuid import uuid4 as uuid from pydantic import BaseModel from fastapi import FastAPI, Request, Response, HTTPException from fastapi.responses import RedirectResponse from aiohttp import ClientSession, ClientTimeout double_space = regex.compile(r'\s{2,}') class RadioException(Exception): pass class ValidRadioSongRequest(BaseModel): """ - **key**: API Key - **artist**: artist to search - **song**: song to search - **artistsong**: may be used IN PLACE OF artist/song to perform a combined/string search in the format "artist - song" - **alsoSkip**: Whether to skip immediately to this track [not implemented] """ key: str artist: str | None = None song: str | None = None artistsong: str | None = None alsoSkip: bool = False class ValidRadioNextRequest(BaseModel): """ - **key**: API Key """ key: str class Radio(FastAPI): """Radio Endpoints""" def __init__(self, app: FastAPI, my_util, constants, glob_state): # pylint: disable=super-init-not-called self.app = app self.util = my_util self.constants = constants self.glob_state = glob_state self.ls_uri = "http://10.10.10.101:29000" self.sqlite_exts: list[str] = ['/home/singer/api/solibs/spellfix1.cpython-311-x86_64-linux-gnu.so'] self.active_playlist_path = os.path.join("/usr/local/share", "sqlite_dbs", "track_file_map.db") self.active_playlist_name = "default" # not used self.active_playlist = [] self.now_playing = { 'artist': 'N/A', 'song': 'N/A', 'artistsong': 'N/A - N/A', 'duration': 0, 'start': 0, 'end': 0, 'file_path': None, } self.endpoints = { "radio/np": self.radio_now_playing, "radio/request": self.radio_request, "radio/get_queue": self.radio_get_queue, "radio/skip": self.radio_skip, # "widget/sqlite": self.homepage_sqlite_widget, # "widget/lyrics": self.homepage_lyrics_widget, # "widget/radio": self.homepage_radio_widget, } for endpoint, handler in self.endpoints.items(): app.add_api_route(f"/{endpoint}", handler, methods=["POST"], include_in_schema=True) # change include_in_schema to False # NOTE: Not in loop because method is GET for this endpoint app.add_api_route("/radio/album_art", self.album_art_handler, methods=["GET"], include_in_schema=True) #NOTE: Not in loop because include_in_schema is False for this endpoint, private app.add_api_route("/radio/ls._next_", self.radio_get_next, methods=["POST"], include_in_schema=False) asyncio.get_event_loop().run_until_complete(self.load_playlist()) async def get_queue_item_by_uuid(self, uuid: str) -> tuple[int, dict] | None: """ Get queue item by UUID Args: uuid: The UUID to search Returns: dict|None """ for x, item in enumerate(self.active_playlist): if item.get('uuid') == uuid: return (x, item) return False async def _ls_skip(self) -> bool: async with ClientSession() as session: async with session.get(f"{self.ls_uri}/next", timeout=ClientTimeout(connect=2, sock_read=2)) as request: request.raise_for_status() text = await request.text() return text == "OK" async def radio_skip(self, data: ValidRadioNextRequest, request: Request) -> bool: """ Skip to the next track in the queue """ try: if not self.util.check_key(path=request.url.path, req_type=4, key=data.key): raise HTTPException(status_code=403, detail="Unauthorized") return await self._ls_skip() except Exception as e: traceback.print_exc() return False async def radio_get_queue(self, request: Request, limit: int = 100) -> dict: """ Get current play queue, up to limit n [default: 100] Args: limit (int): Number of results to return (default 100) Returns: dict """ queue_out = [] for x, item in enumerate(self.active_playlist[0:limit+1]): queue_out.append({ 'pos': x, 'uuid': item.get('uuid'), 'artist': item.get('artist'), 'song': item.get('song'), 'artistsong': item.get('artistsong'), 'duration': item.get('duration'), }) return { 'items': queue_out } async def search_playlist(self, artistsong: str|None = None, artist: str|None = None, song: str|None = None) -> bool: if artistsong and (artist or song): raise RadioException("Cannot search using combination provided") if not artistsong and (not artist or not song): raise RadioException("No query provided") try: search_artist = None search_song = None search_query = 'SELECT id, artist, song, (artist || " - " || song) AS artistsong, file_path, duration FROM tracks\ WHERE editdist3((lower(artist) || " " || lower(song)), (? || " " || ?))\ <= 410 ORDER BY editdist3((lower(artist) || " " || lower(song)), ?) ASC LIMIT 1' if artistsong: artistsong_split = artistsong.split(" - ", maxsplit=1) (search_artist, search_song) = tuple(artistsong_split) else: search_artist = artist search_song = song if not artistsong: artistsong = f"{search_artist} - {search_song}" search_params = (search_artist.lower(), search_song.lower(), artistsong.lower(),) async with sqlite3.connect(self.active_playlist_path, timeout=2) as db_conn: await db_conn.enable_load_extension(True) for ext in self.sqlite_exts: await db_conn.load_extension(ext) db_conn.row_factory = sqlite3.Row async with await db_conn.execute(search_query, search_params) as db_cursor: result = await db_cursor.fetchone() if not result: return False pushObj = { 'uuid': str(uuid().hex), 'artist': result['artist'].strip(), 'song': result['song'].strip(), 'artistsong': result['artistsong'].strip(), 'file_path': result['file_path'], 'duration': result['duration'], } self.active_playlist.insert(0, pushObj) return True except Exception as e: logging.critical("search_playlist:: Search error occurred: %s", str(e)) traceback.print_exc() return False async def load_playlist(self): try: logging.critical(f"Loading playlist...") self.active_playlist.clear() db_query = 'SELECT distinct(artist || " - " || song) AS artistdashsong, id, artist, song, file_path, duration FROM tracks\ GROUP BY artistdashsong ORDER BY RANDOM()' async with sqlite3.connect(self.active_playlist_path, timeout=2) as db_conn: db_conn.row_factory = sqlite3.Row async with await db_conn.execute(db_query) as db_cursor: results = await db_cursor.fetchall() self.active_playlist = [{ 'uuid': str(uuid().hex), 'artist': double_space.sub(' ', r['artist']).strip(), 'song': double_space.sub(' ', r['song']).strip(), 'artistsong': double_space.sub(' ', r['artistdashsong']).strip(), 'file_path': r['file_path'], 'duration': r['duration'], } for r in results] logging.info("Populated active playlists with %s items", len(self.active_playlist)) except: traceback.print_exc() # TODO: Optimize/cache async def album_art_handler(self, request: Request) -> bytes: try: current_file = self.now_playing.get('file_path') if not current_file: logging.info("album_art_handler:: No current file") return RedirectResponse(url="https://codey.lol/images/radio_art_default.jpg", status_code=302) current_file = current_file.replace("/paul/toons/", "/singer/gogs_toons/") tagged = music_tag.load_file(current_file) album_art = tagged.get('artwork').first return Response(content=album_art.data, media_type=album_art.mime) except Exception as e: traceback.print_exc() return RedirectResponse(url="https://codey.lol/images/radio_art_default.jpg", status_code=302) async def radio_now_playing(self, request: Request) -> dict: ret_obj = {**self.now_playing} cur_elapsed = self.now_playing.get('elapsed', -1) cur_duration = self.now_playing.get('duration', 999999) try: ret_obj['elapsed'] = int(time.time()) - ret_obj['start'] except KeyError: traceback.print_exc() ret_obj['elapsed'] = 0 elapsed = ret_obj['elapsed'] duration = ret_obj['duration'] ret_obj.pop('file_path') return ret_obj async def radio_get_next(self, data: ValidRadioNextRequest, request: Request) -> dict: """ Get next track Args: None Returns: str: Next track in queue Track will be removed from the queue in the process (pop from top of list). """ if not self.util.check_key(path=request.url.path, req_type=4, key=data.key): raise HTTPException(status_code=403, detail="Unauthorized") if isinstance(self.active_playlist, list) and self.active_playlist: next = self.active_playlist.pop(0) if not isinstance(next, dict): logging.info("next is of type: %s, reloading playlist...", type(next)) await self.load_playlist() return await self.radio_pop_track(request, recursion_type="not dict: next") duration = next['duration'] time_started = int(time.time()) time_ends = int(time_started + duration) if len(self.active_playlist) > 1: self.active_playlist.append(next) # Push to end of playlist else: await self.load_playlist() logging.info("Returning %s", next['artistsong']) # logging.info("Top 5 songs in playlist: %s, bottom: %s", # self.active_playlist[0:6], self.active_playlist[-6:]) self.now_playing = next next['start'] = time_started next['end'] = time_ends return next else: return await self.radio_pop_track(request, recursion_type="not list: self.active_playlist") async def radio_request(self, data: ValidRadioSongRequest, request: Request) -> Response: if not self.util.check_key(path=request.url.path, req_type=4, key=data.key): raise HTTPException(status_code=403, detail="Unauthorized") artistsong = data.artistsong artist = data.artist song = data.song if artistsong and (artist or song): return { 'err': True, 'errorText': 'Invalid request', } if not artistsong and (not artist or not song): return { 'err': True, 'errorText': 'Invalid request', } search = await self.search_playlist(artistsong=artistsong, artist=artist, song=song) if data.alsoSkip: await self._ls_skip() return {'result': search}