api/utils/radio_util.py

504 lines
21 KiB
Python

import logging
import traceback
import time
import datetime
import os
from uuid import uuid4 as uuid
from typing import Union, Optional, Iterable
from aiohttp import ClientSession, ClientTimeout
import regex
from regex import Pattern
import aiosqlite as sqlite3
import gpt
import music_tag # type: ignore
from endpoints.constructors import RadioException
double_space: Pattern = regex.compile(r"\s{2,}")
"""
TODO:
- Album art rework
- get_genre should only be called once for load_playlist, rework get_genre to (optionally) accept a list of artists,
and return (optionally) a list instead of an str
- Allow tracks to be queried again based on genre; unable to query tracks based on genre presently,
as genre was moved outside track_file_map to artist_genre_map
- Ask GPT when we encounter an untagged (no genre defined) artist, automation is needed for this tedious task
- etc..
"""
class RadioUtil:
"""
Radio Utils
"""
def __init__(self, constants) -> None:
self.constants = constants
self.gpt = gpt.GPT(self.constants)
self.ls_uri: str = self.constants.LS_URI
self.sqlite_exts: list[str] = [
"/home/api/api/solibs/spellfix1.cpython-311-x86_64-linux-gnu.so"
]
self.active_playlist_path: str = os.path.join(
"/usr/local/share", "sqlite_dbs", "track_file_map.db"
)
self.artist_genre_db_path: str = os.path.join(
"/usr/local/share", "sqlite_dbs", "artist_genre_map.db"
)
self.album_art_db_path: str = os.path.join(
"/usr/local/share", "sqlite_dbs", "track_album_art.db"
)
self.active_playlist_name = "default" # not used
self.active_playlist: list[dict] = []
self.now_playing: dict = {
"artist": "N/A",
"song": "N/A",
"album": "N/A",
"genre": "N/A",
"artistsong": "N/A - N/A",
"duration": 0,
"start": 0,
"end": 0,
"file_path": None,
"id": None,
}
self.webhooks: dict = {
"gpt": {
"hook": self.constants.GPT_WEBHOOK,
},
"sfm": {
"hook": self.constants.SFM_WEBHOOK,
},
}
def duration_conv(self, s: Union[int, float]) -> str:
"""
Convert duration given in seconds to hours, minutes, and seconds (h:m:s)
Args:
s (Union[int, float]): seconds to convert
Returns:
str
"""
return str(datetime.timedelta(seconds=s)).split(".", maxsplit=1)[0]
async def trackdb_typeahead(self, query: str) -> Optional[list[str]]:
"""
Query track db for typeahead
Args:
query (str): The search query
Returns:
Optional[list[str]]
"""
if not query:
return None
async with sqlite3.connect(self.active_playlist_path, timeout=1) as _db:
_db.row_factory = sqlite3.Row
db_query: str = """SELECT DISTINCT(LOWER(TRIM(artist) || " - " || TRIM(song))),\
(TRIM(artist) || " - " || TRIM(song)) as artistsong FROM tracks WHERE\
artistsong LIKE ? LIMIT 30"""
db_params: tuple[str] = (f"%{query}%",)
async with _db.execute(db_query, db_params) as _cursor:
result: Iterable[sqlite3.Row] = await _cursor.fetchall()
out_result = [str(r["artistsong"]) for r in result]
return out_result
async def search_playlist(
self,
artistsong: Optional[str] = None,
artist: Optional[str] = None,
song: Optional[str] = None,
) -> bool:
"""
Search for track, add it up next in play queue if found
Args:
artistsong (Optional[str]): Artist - Song combo to search [ignored if artist/song are specified]
artist (Optional[str]): Artist to search (ignored if artistsong is specified)
song (Optional[str]): Song to search (ignored if artistsong is specified)
Returns:
bool
"""
if artistsong and (artist or song):
raise RadioException("Cannot search using combination provided")
if not artistsong and (not artist or not song):
raise RadioException("No query provided")
try:
search_query: str = (
'SELECT id, artist, song, (artist || " - " || song) AS artistsong, album, file_path, duration FROM tracks\
WHERE editdist3((lower(artist) || " " || lower(song)), (? || " " || ?))\
<= 410 ORDER BY editdist3((lower(artist) || " " || lower(song)), ?) ASC LIMIT 1'
)
if artistsong:
artistsong_split: list = artistsong.split(" - ", maxsplit=1)
(search_artist, search_song) = tuple(artistsong_split)
else:
search_artist = artist
search_song = song
if not artistsong:
artistsong = f"{search_artist} - {search_song}"
search_params = (
search_artist.lower(),
search_song.lower(),
artistsong.lower(),
)
async with sqlite3.connect(self.active_playlist_path, timeout=2) as db_conn:
await db_conn.enable_load_extension(True)
for ext in self.sqlite_exts:
await db_conn.load_extension(ext)
db_conn.row_factory = sqlite3.Row
async with await db_conn.execute(
search_query, search_params
) as db_cursor:
result: Optional[sqlite3.Row | bool] = await db_cursor.fetchone()
if not result or not isinstance(result, sqlite3.Row):
return False
push_obj: dict = {
"id": result["id"],
"uuid": str(uuid().hex),
"artist": double_space.sub(" ", result["artist"].strip()),
"song": double_space.sub(" ", result["song"].strip()),
"artistsong": result["artistsong"].strip(),
"genre": await self.get_genre(
double_space.sub(" ", result["artist"].strip())
),
"file_path": result["file_path"],
"duration": result["duration"],
}
self.active_playlist.insert(0, push_obj)
return True
except Exception as e:
logging.critical("search_playlist:: Search error occurred: %s", str(e))
traceback.print_exc()
return False
async def add_genre(self, artist: str, genre: str) -> bool:
"""
Add artist/genre pairing to DB
Args:
artist (str)
genre (str)
Returns:
bool
"""
try:
async with sqlite3.connect(self.artist_genre_db_path, timeout=2) as _db:
query: str = "INSERT OR IGNORE INTO artist_genre (artist, genre) VALUES(?, ?)"
params: tuple[str, str] = (artist, genre)
res = await _db.execute_insert(query, params)
if res:
return True
return False
except Exception as e:
logging.info("Failed to store artist/genre pair: %s/%s (%s)",
artist, genre, str(e))
traceback.print_exc()
return False
async def get_genre(self, artist: str) -> str:
"""
Retrieve Genre for given Artist
Args:
artist (str): The artist to query
Returns:
str
"""
try:
artist = artist.strip()
query: str = "SELECT genre FROM artist_genre WHERE artist LIKE ?"
params: tuple[str] = (f"%%{artist}%%",)
async with sqlite3.connect(self.artist_genre_db_path, timeout=2) as _db:
_db.row_factory = sqlite3.Row
async with await _db.execute(query, params) as _cursor:
res = await _cursor.fetchone()
if not res:
raise RadioException(
f"Could not locate {artist} in artist_genre_map db."
)
return res["genre"]
except Exception as e:
logging.info("Failed to look up genre for artist: %s (%s)", artist, str(e))
traceback.print_exc()
return "Not Found"
async def load_playlist(self) -> None:
"""Load Playlist"""
try:
logging.info("Loading playlist...")
self.active_playlist.clear()
# db_query = 'SELECT distinct(artist || " - " || song) AS artistdashsong, id, artist, song, album, genre, file_path, duration FROM tracks\
# GROUP BY artistdashsong ORDER BY RANDOM()'
"""
LIMITED GENRES
"""
db_query: str = (
'SELECT distinct(LOWER(TRIM(artist)) || " - " || LOWER(TRIM(song))), (TRIM(artist) || " - " || TRIM(song))'
"AS artistdashsong, id, artist, song, album, file_path, duration FROM tracks GROUP BY artistdashsong ORDER BY RANDOM()"
)
"""
LIMITED TO ONE/SMALL SUBSET OF GENRES
"""
# db_query = 'SELECT distinct(artist || " - " || song) AS artistdashsong, id, artist, song, album, genre, file_path, duration FROM tracks\
# WHERE (artist LIKE "%winds of plague%" OR artist LIKE "%acacia st%" OR artist LIKE "%suicide si%" OR artist LIKE "%in dying%") AND (NOT song LIKE "%(live%") ORDER BY RANDOM()' #ORDER BY artist DESC, album ASC, song ASC'
"""
LIMITED TO ONE/SOME ARTISTS...
"""
# db_query = 'SELECT distinct(artist || " - " || song) AS artistdashsong, id, artist, song, album, genre, file_path, duration FROM tracks\
# WHERE (artist LIKE "%rise against%" OR artist LIKE "%i prevail%" OR artist LIKE "%volumes%" OR artist LIKE "%movements%" OR artist LIKE "%woe%" OR artist LIKE "%smittyztop%" OR artist LIKE "%chunk! no,%" OR artist LIKE "%fame on fire%" OR artist LIKE "%our last night%" OR artist LIKE "%animal in me%") AND (NOT song LIKE "%%stripped%%" AND NOT song LIKE "%(2022)%" AND NOT song LIKE "%(live%%" AND NOT song LIKE "%%acoustic%%" AND NOT song LIKE "%%instrumental%%" AND NOT song LIKE "%%remix%%" AND NOT song LIKE "%%reimagined%%" AND NOT song LIKE "%%alternative%%" AND NOT song LIKE "%%unzipped%%") GROUP BY artistdashsong ORDER BY RANDOM()'# ORDER BY album ASC, id ASC'
# db_query = 'SELECT distinct(artist || " - " || song) AS artistdashsong, id, artist, song, album, genre, file_path, duration FROM tracks\
# WHERE (artist LIKE "%sullivan king%" OR artist LIKE "%kayzo%" OR artist LIKE "%adventure club%") AND (NOT song LIKE "%%stripped%%" AND NOT song LIKE "%(2022)%" AND NOT song LIKE "%(live%%" AND NOT song LIKE "%%acoustic%%" AND NOT song LIKE "%%instrumental%%" AND NOT song LIKE "%%remix%%" AND NOT song LIKE "%%reimagined%%" AND NOT song LIKE "%%alternative%%" AND NOT song LIKE "%%unzipped%%") GROUP BY artistdashsong ORDER BY RANDOM()'# ORDER BY album ASC, id ASC'
# db_query = 'SELECT distinct(artist || " - " || song) AS artistdashsong, id, artist, song, album, genre, file_path, duration FROM tracks\
# WHERE (artist LIKE "%akira the don%") AND (NOT song LIKE "%%stripped%%" AND NOT song LIKE "%(2022)%" AND NOT song LIKE "%(live%%" AND NOT song LIKE "%%acoustic%%" AND NOT song LIKE "%%instrumental%%" AND NOT song LIKE "%%remix%%" AND NOT song LIKE "%%reimagined%%" AND NOT song LIKE "%%alternative%%" AND NOT song LIKE "%%unzipped%%") GROUP BY artistdashsong ORDER BY RANDOM()'# ORDER BY album ASC, id ASC'
async with sqlite3.connect(
f"file:{self.active_playlist_path}?mode=ro", uri=True, timeout=15
) as db_conn:
db_conn.row_factory = sqlite3.Row
async with await db_conn.execute(db_query) as db_cursor:
results: list[sqlite3.Row] = await db_cursor.fetchall()
self.active_playlist = [
{
"uuid": str(uuid().hex),
"id": r["id"],
"artist": double_space.sub(" ", r["artist"]).strip(),
"song": double_space.sub(" ", r["song"]).strip(),
"album": double_space.sub(" ", r["album"]).strip(),
"genre": await self.get_genre(
double_space.sub(" ", r["artist"]).strip()
),
"artistsong": double_space.sub(
" ", r["artistdashsong"]
).strip(),
"file_path": r["file_path"],
"duration": r["duration"],
}
for r in results
]
logging.info(
"Populated active playlists with %s items",
len(self.active_playlist),
)
except Exception as e:
logging.info("Playlist load failed: %s", str(e))
traceback.print_exc()
async def cache_album_art(self, track_id: int, file_path: str) -> None:
"""
Cache Album Art to SQLite DB
Args:
track_id (int): Track ID to update
file_path (str): Path to file, for artwork extraction
Returns:
None
"""
try:
logging.info(
"cache_album_art: Attempting to store album art for track_id: %s",
track_id,
)
tagger = music_tag.load_file(file_path)
album_art = tagger["artwork"].first.data
async with sqlite3.connect(self.album_art_db_path, timeout=2) as db_conn:
async with await db_conn.execute(
"INSERT OR IGNORE INTO album_art (track_id, album_art) VALUES(?, ?)",
(
track_id,
album_art,
),
) as db_cursor:
await db_conn.commit()
except:
traceback.print_exc()
async def get_album_art(self, track_id: int) -> Optional[bytes]:
"""
Get Album Art
Args:
track_id (int): Track ID to query
Returns:
Optional[bytes]
"""
try:
async with sqlite3.connect(self.album_art_db_path, timeout=2) as db_conn:
db_conn.row_factory = sqlite3.Row
query: str = "SELECT album_art FROM album_art WHERE track_id = ?"
query_params: tuple[int] = (track_id,)
async with await db_conn.execute(query, query_params) as db_cursor:
result: Optional[Union[sqlite3.Row, bool]] = (
await db_cursor.fetchone()
)
if not result or not isinstance(result, sqlite3.Row):
return None
return result["album_art"]
except:
traceback.print_exc()
return None
def get_queue_item_by_uuid(self, _uuid: str) -> Optional[tuple[int, dict]]:
"""
Get queue item by UUID
Args:
uuid: The UUID to search
Returns:
Optional[tuple[int, dict]]
"""
for x, item in enumerate(self.active_playlist):
if item.get("uuid") == _uuid:
return (x, item)
return None
async def _ls_skip(self) -> bool:
"""
Ask LiquidSoap server to skip to the next track
Args:
None
Returns:
bool
"""
try:
async with ClientSession() as session:
async with session.get(
f"{self.ls_uri}/next", timeout=ClientTimeout(connect=2, sock_read=2)
) as request:
request.raise_for_status()
text: Optional[str] = await request.text()
return text == "OK"
except Exception as e:
logging.debug("Skip failed: %s", str(e))
return False # failsafe
async def get_ai_song_info(self, artist: str, song: str) -> Optional[str]:
"""
Get AI Song Info
Args:
artist (str)
song (str)
Returns:
Optional[str]
"""
prompt: str = f" am going to listen to {song} by {artist}."
response: Optional[str] = await self.gpt.get_completion(prompt)
if not response:
logging.critical("No response received from GPT?")
return None
return response
async def webhook_song_change(self, track: dict) -> None:
"""
Handles Song Change Outbounds (Webhooks)
Args:
track (dict)
Returns:
None
"""
try:
# First, send track info
"""
TODO:
Review friendly_track_start and friendly_track_end, not currently in use
"""
# friendly_track_start: str = time.strftime(
# "%Y-%m-%d %H:%M:%S", time.localtime(track["start"])
# )
# friendly_track_end: str = time.strftime(
# "%Y-%m-%d %H:%M:%S", time.localtime(track["end"])
# )
hook_data: dict = {
"username": "serious.FM",
"embeds": [
{
"title": "Now Playing",
"description": f"## {track['song']}\nby\n## {track['artist']}",
"color": 0x30C56F,
"thumbnail": {
"url": f"https://api.codey.lol/radio/album_art?track_id={track['id']}&{int(time.time())}",
},
"fields": [
{
"name": "Duration",
"value": self.duration_conv(track["duration"]),
"inline": True,
},
{
"name": "Genre",
"value": (
track["genre"] if track["genre"] else "Unknown"
),
"inline": True,
},
{
"name": "Filetype",
"value": track["file_path"].rsplit(".", maxsplit=1)[1],
"inline": True,
},
{
"name": "Higher Res",
"value": "[stream/icecast](https://stream.codey.lol/sfm.ogg) | [web player](https://codey.lol/radio)",
"inline": True,
},
{
"name": "Album",
"value": (
track["album"] if track["album"] else "Unknown"
),
},
],
}
],
}
sfm_hook: str = self.webhooks["sfm"].get("hook")
async with ClientSession() as session:
async with await session.post(
sfm_hook,
json=hook_data,
timeout=ClientTimeout(connect=5, sock_read=5),
headers={
"content-type": "application/json; charset=utf-8",
},
) as request:
request.raise_for_status()
# Next, AI feedback
ai_response: Optional[str] = await self.get_ai_song_info(
track["artist"], track["song"]
)
if not ai_response:
return
hook_data = {
"username": "GPT",
"embeds": [
{
"title": "AI Feedback",
"color": 0x35D0FF,
"description": ai_response.strip(),
}
],
}
ai_hook: str = self.webhooks["gpt"].get("hook")
async with ClientSession() as session:
async with await session.post(
ai_hook,
json=hook_data,
timeout=ClientTimeout(connect=5, sock_read=5),
headers={
"content-type": "application/json; charset=utf-8",
},
) as request:
request.raise_for_status()
except Exception as e:
logging.info("Webhook error occurred: %s", str(e))
traceback.print_exc()