#!/usr/bin/env python3.12 # pylint: disable=bare-except, broad-exception-caught, invalid-name import logging import traceback import os import aiosqlite as sqlite3 import time import random import asyncio import regex import music_tag from . import radio_util from uuid import uuid4 as uuid from typing import Optional from pydantic import BaseModel from fastapi import FastAPI, BackgroundTasks, Request, Response, HTTPException from fastapi.responses import RedirectResponse from aiohttp import ClientSession, ClientTimeout double_space = regex.compile(r'\s{2,}') """ TODO: minor refactoring/type annotations/docstrings """ class RadioException(Exception): pass class ValidRadioSongRequest(BaseModel): """ - **key**: API Key - **artist**: artist to search - **song**: song to search - **artistsong**: may be used IN PLACE OF artist/song to perform a combined/string search in the format "artist - song" - **alsoSkip**: Whether to skip immediately to this track [not implemented] """ key: str artist: str | None = None song: str | None = None artistsong: str | None = None alsoSkip: bool = False class ValidRadioNextRequest(BaseModel): """ - **key**: API Key - **skipTo**: UUID to skip to [optional] """ key: str skipTo: str|None = None class ValidRadioReshuffleRequest(ValidRadioNextRequest): """ - **key**: API Key """ class ValidRadioQueueShiftRequest(BaseModel): """ - **key**: API Key - **uuid**: UUID to shift - **next**: Play next if true, immediately if false, default False """ key: str uuid: str next: bool = False class ValidRadioQueueRemovalRequest(BaseModel): """ - **key**: API Key - **uuid**: UUID to remove """ key: str uuid: str class Radio(FastAPI): """Radio Endpoints""" def __init__(self, app: FastAPI, my_util, constants, glob_state) -> None: # pylint: disable=super-init-not-called self.app = app self.util = my_util self.constants = constants self.radio_util = radio_util.RadioUtil(self.constants) self.glob_state = glob_state self.ls_uri = "http://10.10.10.101:29000" self.sqlite_exts: list[str] = ['/home/singer/api/solibs/spellfix1.cpython-311-x86_64-linux-gnu.so'] self.active_playlist_path = os.path.join("/usr/local/share", "sqlite_dbs", "track_file_map.db") self.active_playlist_name = "default" # not used self.active_playlist = [] self.now_playing = { 'artist': 'N/A', 'song': 'N/A', 'artistsong': 'N/A - N/A', 'duration': 0, 'start': 0, 'end': 0, 'file_path': None, 'id': None, } self.endpoints = { "radio/np": self.radio_now_playing, "radio/request": self.radio_request, "radio/get_queue": self.radio_get_queue, "radio/skip": self.radio_skip, "radio/queue_shift": self.radio_queue_shift, "radio/reshuffle": self.radio_reshuffle, "radio/queue_remove": self.radio_queue_remove, "radio/ls._next_": self.radio_get_next, } for endpoint, handler in self.endpoints.items(): app.add_api_route(f"/{endpoint}", handler, methods=["POST"], include_in_schema=False) # change include_in_schema to False # NOTE: Not in loop because method is GET for this endpoint app.add_api_route("/radio/album_art", self.album_art_handler, methods=["GET"], include_in_schema=True) asyncio.get_event_loop().run_until_complete(self.load_playlist()) def get_queue_item_by_uuid(self, uuid: str) -> tuple[int, dict] | None: """ Get queue item by UUID Args: uuid: The UUID to search Returns: dict|None """ for x, item in enumerate(self.active_playlist): if item.get('uuid') == uuid: return (x, item) return None async def _ls_skip(self) -> bool: async with ClientSession() as session: async with session.get(f"{self.ls_uri}/next", timeout=ClientTimeout(connect=2, sock_read=2)) as request: request.raise_for_status() text = await request.text() return text == "OK" async def radio_skip(self, data: ValidRadioNextRequest, request: Request) -> bool: """ Skip to the next track in the queue, or to uuid specified in skipTo if provided """ try: if not self.util.check_key(path=request.url.path, req_type=4, key=data.key): raise HTTPException(status_code=403, detail="Unauthorized") if data.skipTo: (x, _) = self.get_queue_item_by_uuid(data.skipTo) self.active_playlist = self.active_playlist[x:] if not self.active_playlist: await self.load_playlist() return await self._ls_skip() except Exception as e: traceback.print_exc() return False async def radio_reshuffle(self, data: ValidRadioReshuffleRequest, request: Request) -> dict: """ Reshuffle the play queue """ if not self.util.check_key(path=request.url.path, req_type=4, key=data.key): raise HTTPException(status_code=403, detail="Unauthorized") random.shuffle(self.active_playlist) return { 'ok': True } async def radio_get_queue(self, request: Request, limit: int = 20_000) -> dict: """ Get current play queue, up to limit n [default: 20k] Args: limit (int): Number of results to return (default 20k) Returns: dict """ queue_out = [] for x, item in enumerate(self.active_playlist[0:limit+1]): queue_out.append({ 'pos': x, 'id': item.get('id'), 'uuid': item.get('uuid'), 'artist': item.get('artist'), 'song': item.get('song'), 'artistsong': item.get('artistsong'), 'duration': item.get('duration'), }) return { 'items': queue_out } async def radio_queue_shift(self, data: ValidRadioQueueShiftRequest, request: Request) -> dict: """Shift position of a UUID within the queue [currently limited to playing next or immediately]""" if not self.util.check_key(path=request.url.path, req_type=4, key=data.key): raise HTTPException(status_code=403, detail="Unauthorized") (x, item) = self.get_queue_item_by_uuid(data.uuid) self.active_playlist.pop(x) self.active_playlist.insert(0, item) if not data.next: await self._ls_skip() return { 'ok': True, } async def radio_queue_remove(self, data: ValidRadioQueueRemovalRequest, request: Request) -> dict: """Remove an item from the current play queue""" if not self.util.check_key(path=request.url.path, req_type=4, key=data.key): raise HTTPException(status_code=403, detail="Unauthorized") (x, found_item) = self.get_queue_item_by_uuid(data.uuid) if not found_item: return { 'ok': False, 'err': 'UUID not found in play queue', } self.active_playlist.pop(x) return { 'ok': True, } async def search_playlist(self, artistsong: str|None = None, artist: str|None = None, song: str|None = None) -> bool: if artistsong and (artist or song): raise RadioException("Cannot search using combination provided") if not artistsong and (not artist or not song): raise RadioException("No query provided") try: search_artist = None search_song = None search_query = 'SELECT id, artist, song, (artist || " - " || song) AS artistsong, file_path, duration FROM tracks\ WHERE editdist3((lower(artist) || " " || lower(song)), (? || " " || ?))\ <= 410 ORDER BY editdist3((lower(artist) || " " || lower(song)), ?) ASC LIMIT 1' if artistsong: artistsong_split = artistsong.split(" - ", maxsplit=1) (search_artist, search_song) = tuple(artistsong_split) else: search_artist = artist search_song = song if not artistsong: artistsong = f"{search_artist} - {search_song}" search_params = (search_artist.lower(), search_song.lower(), artistsong.lower(),) async with sqlite3.connect(self.active_playlist_path, timeout=2) as db_conn: await db_conn.enable_load_extension(True) for ext in self.sqlite_exts: await db_conn.load_extension(ext) db_conn.row_factory = sqlite3.Row async with await db_conn.execute(search_query, search_params) as db_cursor: result = await db_cursor.fetchone() if not result: return False pushObj = { 'uuid': str(uuid().hex), 'artist': result['artist'].strip(), 'song': result['song'].strip(), 'artistsong': result['artistsong'].strip(), 'file_path': result['file_path'], 'duration': result['duration'], } self.active_playlist.insert(0, pushObj) return True except Exception as e: logging.critical("search_playlist:: Search error occurred: %s", str(e)) traceback.print_exc() return False async def load_playlist(self): try: logging.critical(f"Loading playlist...") self.active_playlist.clear() db_query = 'SELECT distinct(artist || " - " || song) AS artistdashsong, id, artist, song, file_path, duration FROM tracks\ GROUP BY artistdashsong ORDER BY RANDOM()' async with sqlite3.connect(self.active_playlist_path, timeout=2) as db_conn: db_conn.row_factory = sqlite3.Row async with await db_conn.execute(db_query) as db_cursor: results = await db_cursor.fetchall() self.active_playlist = [{ 'uuid': str(uuid().hex), 'id': r['id'], 'artist': double_space.sub(' ', r['artist']).strip(), 'song': double_space.sub(' ', r['song']).strip(), 'artistsong': double_space.sub(' ', r['artistdashsong']).strip(), 'file_path': r['file_path'], 'duration': r['duration'], } for r in results] logging.info("Populated active playlists with %s items", len(self.active_playlist)) except: traceback.print_exc() async def cache_album_art(self, track_id: int, album_art: bytes) -> None: try: logging.info("Attempting cache for %s", track_id) async with sqlite3.connect(self.active_playlist_path, timeout=2) as db_conn: async with await db_conn.execute("UPDATE tracks SET album_art = ? WHERE id = ?", (album_art, track_id,)) as db_cursor: await db_conn.commit() logging.info("Committed %s", track_id) except: traceback.print_exc() async def get_album_art(self, track_id: Optional[int] = None, file_path: Optional[str] = None) -> bytes: try: async with sqlite3.connect(self.active_playlist_path, timeout=2) as db_conn: db_conn.row_factory = sqlite3.Row query = "SELECT album_art FROM tracks WHERE id = ?" query_params = (track_id,) if file_path: query = "SELECT album_art FROM tracks WHERE file_path = ?" query_params = (file_path,) async with await db_conn.execute(query, query_params) as db_cursor: result = await db_cursor.fetchone() if not result: return return result['album_art'] except: traceback.print_exc() return async def _get_album_art(self, track_id: Optional[int] = None, file_path: Optional[str] = None) -> bytes|None: try: if not file_path: file_path = self.now_playing.get('file_path') if not file_path: logging.info("_get_album_art:: No current file") return original_file_path = file_path file_path = file_path.replace("/paul/toons/", "/singer/gogs_toons/") logging.info("Seeking %s", original_file_path) cached_album_art = await self.get_album_art(file_path=original_file_path) if cached_album_art: logging.info("Returning from cache!") return cached_album_art # Not cached, read from file tagged = music_tag.load_file(file_path) album_art = tagged.get('artwork').first logging.info("Returning from file read!") return album_art.data except: traceback.print_exc() # TODO: Optimize/cache async def album_art_handler(self, request: Request) -> bytes: try: album_art = await self._get_album_art() if not album_art: return RedirectResponse(url="https://codey.lol/images/radio_art_default.jpg", status_code=302) return Response(content=album_art, media_type="image/png") except Exception as e: traceback.print_exc() return RedirectResponse(url="https://codey.lol/images/radio_art_default.jpg", status_code=302) async def radio_now_playing(self, request: Request) -> dict: ret_obj = {**self.now_playing} cur_elapsed = self.now_playing.get('elapsed', -1) cur_duration = self.now_playing.get('duration', 999999) try: ret_obj['elapsed'] = int(time.time()) - ret_obj['start'] except KeyError: traceback.print_exc() ret_obj['elapsed'] = 0 elapsed = ret_obj['elapsed'] duration = ret_obj['duration'] ret_obj.pop('file_path') return ret_obj async def radio_get_next(self, data: ValidRadioNextRequest, request: Request, background_tasks: BackgroundTasks) -> dict: """ Get next track Args: None Returns: str: Next track in queue Track will be removed from the queue in the process (pop from top of list). """ if not self.util.check_key(path=request.url.path, req_type=4, key=data.key): raise HTTPException(status_code=403, detail="Unauthorized") if isinstance(self.active_playlist, list) and self.active_playlist: next = self.active_playlist.pop(0) if not isinstance(next, dict): logging.info("next is of type: %s, reloading playlist...", type(next)) await self.load_playlist() return await self.radio_pop_track(request, recursion_type="not dict: next") duration = next['duration'] time_started = int(time.time()) time_ends = int(time_started + duration) if len(self.active_playlist) > 1: self.active_playlist.append(next) # Push to end of playlist else: await self.load_playlist() logging.info("Returning %s", next['artistsong']) # logging.info("Top 5 songs in playlist: %s, bottom: %s", # self.active_playlist[0:6], self.active_playlist[-6:]) self.now_playing = next next['start'] = time_started next['end'] = time_ends try: background_tasks.add_task(self.radio_util.webhook_song_change, next) except Exception as e: traceback.print_exc() try: if not await self.get_album_art(file_path=next['file_path']): album_art = await self._get_album_art(next['file_path']) await self.cache_album_art(next['id'], album_art) except: traceback.print_exc() return next else: return await self.radio_pop_track(request, recursion_type="not list: self.active_playlist") async def radio_request(self, data: ValidRadioSongRequest, request: Request) -> Response: if not self.util.check_key(path=request.url.path, req_type=4, key=data.key): raise HTTPException(status_code=403, detail="Unauthorized") artistsong = data.artistsong artist = data.artist song = data.song if artistsong and (artist or song): return { 'err': True, 'errorText': 'Invalid request', } if not artistsong and (not artist or not song): return { 'err': True, 'errorText': 'Invalid request', } search = await self.search_playlist(artistsong=artistsong, artist=artist, song=song) if data.alsoSkip: await self._ls_skip() return {'result': search}