398 lines
19 KiB
Python
398 lines
19 KiB
Python
#!/usr/bin/env python3.12
|
|
|
|
"""
|
|
Radio Utils
|
|
"""
|
|
|
|
import logging
|
|
import traceback
|
|
import time
|
|
import regex
|
|
from regex import Pattern
|
|
import datetime
|
|
import os
|
|
import gpt
|
|
from aiohttp import ClientSession, ClientTimeout
|
|
import aiosqlite as sqlite3
|
|
from typing import Union, Optional, LiteralString, Iterable
|
|
from uuid import uuid4 as uuid
|
|
from .constructors import RadioException
|
|
|
|
double_space: Pattern = regex.compile(r'\s{2,}')
|
|
|
|
class RadioUtil:
|
|
"""
|
|
Radio Utils
|
|
"""
|
|
def __init__(self, constants) -> None:
|
|
self.constants = constants
|
|
self.gpt = gpt.GPT(self.constants)
|
|
self.ls_uri: str = "http://10.10.10.101:29000"
|
|
self.sqlite_exts: list[str] = ['/home/singer/api/solibs/spellfix1.cpython-311-x86_64-linux-gnu.so']
|
|
self.active_playlist_path: Union[str, LiteralString] = os.path\
|
|
.join("/usr/local/share",
|
|
"sqlite_dbs", "track_file_map.db")
|
|
self.active_playlist_name = "default" # not used
|
|
self.active_playlist: list[dict] = []
|
|
self.now_playing: dict = {
|
|
'artist': 'N/A',
|
|
'song': 'N/A',
|
|
'genre': 'N/A',
|
|
'artistsong': 'N/A - N/A',
|
|
'duration': 0,
|
|
'start': 0,
|
|
'end': 0,
|
|
'file_path': None,
|
|
'id': None,
|
|
}
|
|
self.webhooks: dict = {
|
|
'gpt': {
|
|
'hook': self.constants.GPT_WEBHOOK,
|
|
},
|
|
'sfm': {
|
|
'hook': self.constants.SFM_WEBHOOK,
|
|
}
|
|
}
|
|
|
|
def duration_conv(self,
|
|
s: Union[int, float]) -> str:
|
|
"""
|
|
Convert duration given in seconds to hours, minutes, and seconds (h:m:s)
|
|
Args:
|
|
s (Union[int, float]): seconds to convert
|
|
Returns:
|
|
str
|
|
"""
|
|
return str(datetime.timedelta(seconds=s))\
|
|
.split(".", maxsplit=1)[0]
|
|
|
|
async def trackdb_typeahead(self, query: str) -> Optional[list[str]]:
|
|
if not query:
|
|
return None
|
|
async with sqlite3.connect(self.active_playlist_path,
|
|
timeout=1) as _db:
|
|
_db.row_factory = sqlite3.Row
|
|
db_query: str = """SELECT DISTINCT(LOWER(TRIM(artist) || " - " || TRIM(song))),\
|
|
(TRIM(artist) || " - " || TRIM(song)) as artistsong FROM tracks WHERE\
|
|
artistsong LIKE ? LIMIT 30"""
|
|
db_params: tuple[str] = (f"%{query}%",)
|
|
async with _db.execute(db_query, db_params) as _cursor:
|
|
result: Iterable[sqlite3.Row] = await _cursor.fetchall()
|
|
out_result = [
|
|
str(r['artistsong']) for r in result
|
|
]
|
|
return out_result
|
|
|
|
async def search_playlist(self, artistsong: Optional[str] = None,
|
|
artist: Optional[str] = None,
|
|
song: Optional[str] = None) -> bool:
|
|
"""
|
|
Search for track, add it up next in play queue if found
|
|
Args:
|
|
artistsong (Optional[str]): Artist - Song combo to search [ignored if artist/song are specified]
|
|
artist (Optional[str]): Artist to search (ignored if artistsong is specified)
|
|
song (Optional[str]): Song to search (ignored if artistsong is specified)
|
|
Returns:
|
|
bool
|
|
"""
|
|
if artistsong and (artist or song):
|
|
raise RadioException("Cannot search using combination provided")
|
|
if not artistsong and (not artist or not song):
|
|
raise RadioException("No query provided")
|
|
try:
|
|
search_query: str = 'SELECT id, artist, song, (artist || " - " || song) AS artistsong, genre, file_path, duration FROM tracks\
|
|
WHERE editdist3((lower(artist) || " " || lower(song)), (? || " " || ?))\
|
|
<= 410 ORDER BY editdist3((lower(artist) || " " || lower(song)), ?) ASC LIMIT 1'
|
|
if artistsong:
|
|
artistsong_split: list = artistsong.split(" - ", maxsplit=1)
|
|
(search_artist, search_song) = tuple(artistsong_split)
|
|
else:
|
|
search_artist = artist
|
|
search_song = song
|
|
if not artistsong:
|
|
artistsong = f"{search_artist} - {search_song}"
|
|
search_params = (search_artist.lower(), search_song.lower(), artistsong.lower(),)
|
|
async with sqlite3.connect(self.active_playlist_path,
|
|
timeout=2) as db_conn:
|
|
await db_conn.enable_load_extension(True)
|
|
for ext in self.sqlite_exts:
|
|
await db_conn.load_extension(ext)
|
|
db_conn.row_factory = sqlite3.Row
|
|
async with await db_conn.execute(search_query, search_params) as db_cursor:
|
|
result: Optional[sqlite3.Row|bool] = await db_cursor.fetchone()
|
|
if not result or not isinstance(result, sqlite3.Row):
|
|
return False
|
|
pushObj: dict = {
|
|
'id': result['id'],
|
|
'uuid': str(uuid().hex),
|
|
'artist': result['artist'].strip(),
|
|
'song': result['song'].strip(),
|
|
'artistsong': result['artistsong'].strip(),
|
|
'genre': result['genre'],
|
|
'file_path': result['file_path'],
|
|
'duration': result['duration'],
|
|
}
|
|
self.active_playlist.insert(0, pushObj)
|
|
return True
|
|
except Exception as e:
|
|
logging.critical("search_playlist:: Search error occurred: %s", str(e))
|
|
traceback.print_exc()
|
|
return False
|
|
|
|
async def load_playlist(self) -> None:
|
|
"""Load Playlist"""
|
|
try:
|
|
logging.info(f"Loading playlist...")
|
|
self.active_playlist.clear()
|
|
# db_query = 'SELECT distinct(artist || " - " || song) AS artistdashsong, id, artist, song, genre, file_path, duration FROM tracks\
|
|
# GROUP BY artistdashsong ORDER BY RANDOM()'
|
|
|
|
"""
|
|
LIMITED GENRES
|
|
"""
|
|
|
|
db_query: str = """SELECT distinct(LOWER(TRIM(artist)) || " - " || LOWER(TRIM(song))), (TRIM(artist) || " - " || TRIM(song)) AS artistdashsong, id, artist, song, genre, file_path, duration FROM tracks\
|
|
WHERE genre LIKE "%metalcore%"\
|
|
OR genre LIKE "%rock%"\
|
|
OR genre LIKE "%pop punk%"\
|
|
OR genre LIKE "%math rock%"\
|
|
OR genre LIKE "%punk rock%"\
|
|
OR genre LIKE "%metal%"\
|
|
OR genre LIKE "%punk%"\
|
|
OR genre LIKE "%electronic%"\
|
|
OR genre LIKE "%nu metal%"\
|
|
OR genre LIKE "%EDM%"\
|
|
OR genre LIKE "%post-hardcore%"\
|
|
OR genre LIKE "%pop rock%"\
|
|
OR genre LIKE "%experimental%"\
|
|
OR genre LIKE "%post-punk%"\
|
|
OR genre LIKE "%death metal%"\
|
|
OR genre LIKE "%electronicore%"\
|
|
OR genre LIKE "%hard rock%"\
|
|
OR genre LIKE "%psychedelic rock%"\
|
|
OR genre LIKE "%grunge%"\
|
|
OR genre LIKE "%house%"\
|
|
OR genre LIKE "%dubstep%"\
|
|
OR genre LIKE "%hardcore%"\
|
|
OR genre LIKE "%hair metal%"\
|
|
OR genre LIKE "%horror punk%"\
|
|
OR genre LIKE "%folk punk%"\
|
|
OR genre LIKE "%breakcore%"\
|
|
OR genre LIKE "%post-rock%"\
|
|
OR genre LIKE "%deathcore%"\
|
|
OR genre LIKE "%hardcore punk%"\
|
|
OR genre LIKE "%synthwave%"\
|
|
OR genre LIKE "%trap%"\
|
|
OR genre LIKE "%indie pop%"\
|
|
OR genre LIKE "untagged"\
|
|
GROUP BY artistdashsong ORDER BY RANDOM()"""
|
|
|
|
"""
|
|
LIMITED TO ONE ARTIST...
|
|
"""
|
|
|
|
# db_query = 'SELECT distinct(artist || " - " || song) AS artistdashsong, id, artist, song, genre, file_path, duration FROM tracks\
|
|
# WHERE artist LIKE "%bad omens%" GROUP BY artistdashsong ORDER BY RANDOM()'
|
|
|
|
async with sqlite3.connect(self.active_playlist_path,
|
|
timeout=2) as db_conn:
|
|
db_conn.row_factory = sqlite3.Row
|
|
async with await db_conn.execute(db_query) as db_cursor:
|
|
results: list[sqlite3.Row] = await db_cursor.fetchall()
|
|
self.active_playlist = [{
|
|
'uuid': str(uuid().hex),
|
|
'id': r['id'],
|
|
'artist': double_space.sub(' ', r['artist']).strip(),
|
|
'song': double_space.sub(' ', r['song']).strip(),
|
|
'genre': r['genre'] if r['genre'] else 'Unknown',
|
|
'artistsong': double_space.sub(' ', r['artistdashsong']).strip(),
|
|
'file_path': r['file_path'],
|
|
'duration': r['duration'],
|
|
} for r in results]
|
|
logging.info("Populated active playlists with %s items",
|
|
len(self.active_playlist))
|
|
except:
|
|
traceback.print_exc()
|
|
|
|
async def cache_album_art(self, track_id: int,
|
|
album_art: bytes) -> None:
|
|
"""
|
|
Cache Album Art to SQLite DB
|
|
Args:
|
|
track_id (int): Track ID to update
|
|
album_art (bytes): Album art data
|
|
Returns:
|
|
None
|
|
"""
|
|
try:
|
|
async with sqlite3.connect(self.active_playlist_path,
|
|
timeout=2) as db_conn:
|
|
async with await db_conn.execute("UPDATE tracks SET album_art = ? WHERE id = ?",
|
|
(album_art, track_id,)) as db_cursor:
|
|
await db_conn.commit()
|
|
except:
|
|
traceback.print_exc()
|
|
|
|
async def get_album_art(self, track_id: Optional[int] = None,
|
|
file_path: Optional[str] = None) -> Optional[bytes]:
|
|
"""
|
|
Get Album Art
|
|
Args:
|
|
track_id (Optional[int]): Track ID to query (ignored if file_path is specified)
|
|
file_path (Optional[str]): file_path to query (ignored if track_id is specified)
|
|
Returns:
|
|
bytes
|
|
"""
|
|
try:
|
|
async with sqlite3.connect(self.active_playlist_path,
|
|
timeout=2) as db_conn:
|
|
db_conn.row_factory = sqlite3.Row
|
|
query: str = "SELECT album_art FROM tracks WHERE id = ?"
|
|
query_params: tuple = (track_id,)
|
|
|
|
if file_path and not track_id:
|
|
query = "SELECT album_art FROM tracks WHERE file_path = ?"
|
|
query_params = (file_path,)
|
|
|
|
async with await db_conn.execute(query,
|
|
query_params) as db_cursor:
|
|
result: Optional[Union[sqlite3.Row, bool]] = await db_cursor.fetchone()
|
|
if not result or not isinstance(result, sqlite3.Row):
|
|
return None
|
|
return result['album_art']
|
|
except:
|
|
traceback.print_exc()
|
|
return None
|
|
|
|
def get_queue_item_by_uuid(self,
|
|
uuid: str) -> Optional[tuple[int, dict]]:
|
|
"""
|
|
Get queue item by UUID
|
|
Args:
|
|
uuid: The UUID to search
|
|
Returns:
|
|
Optional[tuple[int, dict]]
|
|
"""
|
|
for x, item in enumerate(self.active_playlist):
|
|
if item.get('uuid') == uuid:
|
|
return (x, item)
|
|
return None
|
|
|
|
async def _ls_skip(self) -> bool:
|
|
"""
|
|
Ask LiquidSoap server to skip to the next track
|
|
Args:
|
|
None
|
|
Returns:
|
|
bool
|
|
"""
|
|
try:
|
|
async with ClientSession() as session:
|
|
async with session.get(f"{self.ls_uri}/next",
|
|
timeout=ClientTimeout(connect=2, sock_read=2)) as request:
|
|
request.raise_for_status()
|
|
text: Optional[str] = await request.text()
|
|
return text == "OK"
|
|
except Exception as e:
|
|
logging.debug("Skip failed: %s", str(e))
|
|
|
|
return False # failsafe
|
|
|
|
async def get_ai_song_info(self, artist: str,
|
|
song: str) -> Optional[str]:
|
|
"""
|
|
Get AI Song Info
|
|
Args:
|
|
artist (str)
|
|
song (str)
|
|
Returns:
|
|
Optional[str]
|
|
"""
|
|
prompt: str = f" am going to listen to {song} by {artist}."
|
|
response: Optional[str] = await self.gpt.get_completion(prompt)
|
|
if not response:
|
|
logging.critical("No response received from GPT?")
|
|
return None
|
|
return response
|
|
|
|
async def webhook_song_change(self, track: dict) -> None:
|
|
try:
|
|
"""
|
|
Handles Song Change Outbounds (Webhooks)
|
|
Args:
|
|
track (dict)
|
|
Returns:
|
|
None
|
|
"""
|
|
|
|
|
|
# First, send track info
|
|
friendly_track_start: str = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(track['start']))
|
|
friendly_track_end: str = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(track['end']))
|
|
hook_data: dict = {
|
|
'username': 'serious.FM',
|
|
"embeds": [{
|
|
"title": "Now Playing",
|
|
"description": f'## {track['song']}\nby\n## {track['artist']}',
|
|
"color": 0x30c56f,
|
|
"thumbnail": {
|
|
"url": f"https://api.codey.lol/radio/album_art?track_id={track['id']}&{int(time.time())}",
|
|
},
|
|
"fields": [
|
|
{
|
|
"name": "Duration",
|
|
"value": self.duration_conv(track['duration']),
|
|
"inline": True,
|
|
},
|
|
{
|
|
"name": "Genre",
|
|
"value": track['genre'] if track['genre'] else 'Unknown',
|
|
"inline": True,
|
|
},
|
|
{
|
|
"name": "Filetype",
|
|
"value": track['file_path'].rsplit(".", maxsplit=1)[1],
|
|
"inline": True,
|
|
},
|
|
{
|
|
"name": "Higher Res",
|
|
"value": "[stream/icecast](https://relay.sfm.codey.lol/aces.ogg) | [web player](https://codey.lol/radio)",
|
|
"inline": True,
|
|
}
|
|
]
|
|
}]
|
|
}
|
|
|
|
sfm_hook: str = self.webhooks['sfm'].get('hook')
|
|
async with ClientSession() as session:
|
|
async with await session.post(sfm_hook, json=hook_data,
|
|
timeout=ClientTimeout(connect=5, sock_read=5), headers={
|
|
'content-type': 'application/json; charset=utf-8',}) as request:
|
|
request.raise_for_status()
|
|
|
|
# Next, AI feedback
|
|
|
|
ai_response: Optional[str] = await self.get_ai_song_info(track['artist'],
|
|
track['song'])
|
|
if not ai_response:
|
|
return
|
|
|
|
hook_data = {
|
|
'username': 'GPT',
|
|
"embeds": [{
|
|
"title": "AI Feedback",
|
|
"color": 0x35d0ff,
|
|
"description": ai_response.strip(),
|
|
}]
|
|
}
|
|
|
|
ai_hook: str = self.webhooks['gpt'].get('hook')
|
|
async with ClientSession() as session:
|
|
async with await session.post(ai_hook, json=hook_data,
|
|
timeout=ClientTimeout(connect=5, sock_read=5), headers={
|
|
'content-type': 'application/json; charset=utf-8',}) as request:
|
|
request.raise_for_status()
|
|
except Exception as e:
|
|
traceback.print_exc()
|
|
|
|
|