api/endpoints/radio_util.py
2025-02-16 06:53:41 -05:00

351 lines
15 KiB
Python

#!/usr/bin/env python3.12
"""
Radio Utils
"""
import logging
import traceback
import time
import regex
from regex import Pattern
import datetime
import os
import gpt
from aiohttp import ClientSession, ClientTimeout
import aiosqlite as sqlite3
from typing import Union, Optional, LiteralString
from uuid import uuid4 as uuid
from .constructors import RadioException
double_space: Pattern = regex.compile(r'\s{2,}')
class RadioUtil:
"""
Radio Utils
"""
def __init__(self, constants) -> None:
self.constants = constants
self.gpt = gpt.GPT(self.constants)
self.ls_uri: str = "http://10.10.10.101:29000"
self.sqlite_exts: list[str] = ['/home/singer/api/solibs/spellfix1.cpython-311-x86_64-linux-gnu.so']
self.active_playlist_path: Union[str, LiteralString] = os.path\
.join("/usr/local/share",
"sqlite_dbs", "track_file_map.db")
self.active_playlist_name = "default" # not used
self.active_playlist: list[dict] = []
self.now_playing: dict = {
'artist': 'N/A',
'song': 'N/A',
'genre': 'N/A',
'artistsong': 'N/A - N/A',
'duration': 0,
'start': 0,
'end': 0,
'file_path': None,
'id': None,
}
self.webhooks: dict = {
'gpt': {
'hook': self.constants.GPT_WEBHOOK,
},
'sfm': {
'hook': self.constants.SFM_WEBHOOK,
}
}
def duration_conv(self,
s: Union[int, float]) -> str:
"""
Convert duration given in seconds to hours, minutes, and seconds (h:m:s)
Args:
s (Union[int, float]): seconds to convert
Returns:
str
"""
return str(datetime.timedelta(seconds=s)).split(".", maxsplit=1)[0]
async def search_playlist(self, artistsong: Optional[str] = None,
artist: Optional[str] = None,
song: Optional[str] = None) -> bool:
"""
Search for track, add it up next in play queue if found
Args:
artistsong (Optional[str]): Artist - Song combo to search [ignored if artist/song are specified]
artist (Optional[str]): Artist to search (ignored if artistsong is specified)
song (Optional[str]): Song to search (ignored if artistsong is specified)
Returns:
bool
"""
if artistsong and (artist or song):
raise RadioException("Cannot search using combination provided")
if not artistsong and (not artist or not song):
raise RadioException("No query provided")
try:
search_query: str = 'SELECT id, artist, song, (artist || " - " || song) AS artistsong, genre, file_path, duration FROM tracks\
WHERE editdist3((lower(artist) || " " || lower(song)), (? || " " || ?))\
<= 410 ORDER BY editdist3((lower(artist) || " " || lower(song)), ?) ASC LIMIT 1'
if artistsong:
artistsong_split: list = artistsong.split(" - ", maxsplit=1)
(search_artist, search_song) = tuple(artistsong_split)
else:
search_artist = artist
search_song = song
if not artistsong:
artistsong = f"{search_artist} - {search_song}"
search_params = (search_artist.lower(), search_song.lower(), artistsong.lower(),)
async with sqlite3.connect(self.active_playlist_path,
timeout=2) as db_conn:
await db_conn.enable_load_extension(True)
for ext in self.sqlite_exts:
await db_conn.load_extension(ext)
db_conn.row_factory = sqlite3.Row
async with await db_conn.execute(search_query, search_params) as db_cursor:
result: Optional[sqlite3.Row|bool] = await db_cursor.fetchone()
if not result or not isinstance(result, sqlite3.Row):
return False
pushObj: dict = {
'id': result['id'],
'uuid': str(uuid().hex),
'artist': result['artist'].strip(),
'song': result['song'].strip(),
'artistsong': result['artistsong'].strip(),
'genre': result['genre'],
'file_path': result['file_path'],
'duration': result['duration'],
}
self.active_playlist.insert(0, pushObj)
return True
except Exception as e:
logging.critical("search_playlist:: Search error occurred: %s", str(e))
traceback.print_exc()
return False
async def load_playlist(self) -> None:
"""Load Playlist"""
try:
logging.info(f"Loading playlist...")
self.active_playlist.clear()
# db_query = 'SELECT distinct(artist || " - " || song) AS artistdashsong, id, artist, song, genre, file_path, duration FROM tracks\
# GROUP BY artistdashsong ORDER BY RANDOM()'
"""
LIMITED GENRES
"""
db_query: str = 'SELECT distinct(artist || " - " || song) AS artistdashsong, id, artist, song, genre, file_path, duration FROM tracks\
WHERE genre IN ("metalcore", "rock", "pop punk", "math rock", "punk rock",\
"metal", "punk", "electronic", "nu metal", "EDM",\
"post-hardcore", "pop rock", "experimental", "post-punk", "death metal", "electronicore", "hard rock", "psychedelic rock",\
"grunge", "house", "dubstep", "hardcore", "hair metal", "horror punk", "folk punk", "breakcore",\
"post-rock", "deathcore", "hardcore punk", "synthwave", "trap") GROUP BY artistdashsong ORDER BY RANDOM()'
"""
LIMITED TO ONE ARTIST...
"""
# db_query = 'SELECT distinct(artist || " - " || song) AS artistdashsong, id, artist, song, genre, file_path, duration FROM tracks\
# WHERE artist LIKE "%bad omens%" GROUP BY artistdashsong ORDER BY RANDOM()'
async with sqlite3.connect(self.active_playlist_path,
timeout=2) as db_conn:
db_conn.row_factory = sqlite3.Row
async with await db_conn.execute(db_query) as db_cursor:
results: list[sqlite3.Row] = await db_cursor.fetchall()
self.active_playlist = [{
'uuid': str(uuid().hex),
'id': r['id'],
'artist': double_space.sub(' ', r['artist']).strip(),
'song': double_space.sub(' ', r['song']).strip(),
'genre': r['genre'] if r['genre'] else 'Unknown',
'artistsong': double_space.sub(' ', r['artistdashsong']).strip(),
'file_path': r['file_path'],
'duration': r['duration'],
} for r in results]
logging.info("Populated active playlists with %s items",
len(self.active_playlist))
except:
traceback.print_exc()
async def cache_album_art(self, track_id: int,
album_art: bytes) -> None:
"""
Cache Album Art to SQLite DB
Args:
track_id (int): Track ID to update
album_art (bytes): Album art data
Returns:
None
"""
try:
async with sqlite3.connect(self.active_playlist_path,
timeout=2) as db_conn:
async with await db_conn.execute("UPDATE tracks SET album_art = ? WHERE id = ?",
(album_art, track_id,)) as db_cursor:
await db_conn.commit()
except:
traceback.print_exc()
async def get_album_art(self, track_id: Optional[int] = None,
file_path: Optional[str] = None) -> Optional[bytes]:
"""
Get Album Art
Args:
track_id (Optional[int]): Track ID to query (ignored if file_path is specified)
file_path (Optional[str]): file_path to query (ignored if track_id is specified)
Returns:
bytes
"""
try:
async with sqlite3.connect(self.active_playlist_path,
timeout=2) as db_conn:
db_conn.row_factory = sqlite3.Row
query: str = "SELECT album_art FROM tracks WHERE id = ?"
query_params: tuple = (track_id,)
if file_path and not track_id:
query = "SELECT album_art FROM tracks WHERE file_path = ?"
query_params = (file_path,)
async with await db_conn.execute(query,
query_params) as db_cursor:
result: Optional[Union[sqlite3.Row, bool]] = await db_cursor.fetchone()
if not result or not isinstance(result, sqlite3.Row):
return None
return result['album_art']
except:
traceback.print_exc()
return None
def get_queue_item_by_uuid(self,
uuid: str) -> Optional[tuple[int, dict]]:
"""
Get queue item by UUID
Args:
uuid: The UUID to search
Returns:
Optional[tuple[int, dict]]
"""
for x, item in enumerate(self.active_playlist):
if item.get('uuid') == uuid:
return (x, item)
return None
async def _ls_skip(self) -> bool:
"""
Ask LiquidSoap server to skip to the next track
Args:
None
Returns:
bool
"""
try:
async with ClientSession() as session:
async with session.get(f"{self.ls_uri}/next",
timeout=ClientTimeout(connect=2, sock_read=2)) as request:
request.raise_for_status()
text: Optional[str] = await request.text()
return text == "OK"
except Exception as e:
logging.debug("Skip failed: %s", str(e))
return False # failsafe
async def get_ai_song_info(self, artist: str,
song: str) -> Optional[str]:
"""
Get AI Song Info
Args:
artist (str)
song (str)
Returns:
Optional[str]
"""
prompt: str = f" am going to listen to {song} by {artist}."
response: Optional[str] = await self.gpt.get_completion(prompt)
if not response:
logging.critical("No response received from GPT?")
return None
return response
async def webhook_song_change(self, track: dict) -> None:
try:
"""
Handles Song Change Outbounds (Webhooks)
Args:
track (dict)
Returns:
None
"""
# First, send track info
friendly_track_start: str = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(track['start']))
friendly_track_end: str = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(track['end']))
hook_data: dict = {
'username': 'serious.FM',
"embeds": [{
"title": "Now Playing",
"description": f'## {track['song']}\nby\n## {track['artist']}',
"color": 0x30c56f,
"thumbnail": {
"url": f"https://api.codey.lol/radio/album_art?track_id={track['id']}&{time.time()}",
},
"fields": [
{
"name": "Duration",
"value": self.duration_conv(track['duration']),
"inline": True,
},
{
"name": "Genre",
"value": track['genre'] if track['genre'] else 'Unknown',
"inline": True,
},
{
"name": "Filetype",
"value": track['file_path'].rsplit(".", maxsplit=1)[1],
"inline": True,
},
{
"name": "Higher Res",
"value": "[stream/icecast](https://relay.sfm.codey.lol/aces.ogg) | [web player](https://codey.lol/radio)",
"inline": True,
}
]
}]
}
sfm_hook: str = self.webhooks['sfm'].get('hook')
async with ClientSession() as session:
async with await session.post(sfm_hook, json=hook_data,
timeout=ClientTimeout(connect=5, sock_read=5), headers={
'content-type': 'application/json; charset=utf-8',}) as request:
request.raise_for_status()
# Next, AI feedback
ai_response: Optional[str] = await self.get_ai_song_info(track['artist'],
track['song'])
if not ai_response:
return
hook_data = {
'username': 'GPT',
"embeds": [{
"title": "AI Feedback",
"color": 0x35d0ff,
"description": ai_response.strip(),
}]
}
ai_hook: str = self.webhooks['gpt'].get('hook')
async with ClientSession() as session:
async with await session.post(ai_hook, json=hook_data,
timeout=ClientTimeout(connect=5, sock_read=5), headers={
'content-type': 'application/json; charset=utf-8',}) as request:
request.raise_for_status()
except Exception as e:
traceback.print_exc()