633 lines
24 KiB
Python
633 lines
24 KiB
Python
import logging
|
|
import traceback
|
|
import time
|
|
import datetime
|
|
import os
|
|
import random
|
|
from uuid import uuid4 as uuid
|
|
from typing import Union, Optional, Iterable
|
|
from aiohttp import ClientSession, ClientTimeout
|
|
import regex
|
|
from regex import Pattern
|
|
import sqlite3
|
|
import gpt
|
|
import music_tag # type: ignore
|
|
from rapidfuzz import fuzz
|
|
from endpoints.constructors import RadioException
|
|
|
|
double_space: Pattern = regex.compile(r"\s{2,}")
|
|
non_alnum: Pattern = regex.compile(r"[^a-zA-Z0-9]")
|
|
|
|
"""
|
|
TODO:
|
|
- get_genre should only be called once for load_playlist, rework get_genre to (optionally) accept a list of artists,
|
|
and return (optionally) a list instead of an str
|
|
- Ask GPT when we encounter an untagged (no genre defined) artist, automation is needed for this tedious task
|
|
- etc..
|
|
"""
|
|
|
|
|
|
class RadioUtil:
|
|
"""
|
|
Radio Utils
|
|
"""
|
|
|
|
def __init__(self, constants, loop) -> None:
|
|
self.constants = constants
|
|
self.loop = loop
|
|
self.gpt = gpt.GPT(self.constants)
|
|
self.ls_uri: str = self.constants.LS_URI
|
|
self.sqlite_exts: list[str] = [
|
|
"/home/kyle/api/solibs/spellfix1.cpython-311-x86_64-linux-gnu.so"
|
|
]
|
|
self.active_playlist_path: str = os.path.join(
|
|
"/mnt/data/share", "sqlite_dbs", "track_file_map.db"
|
|
)
|
|
self.artist_genre_db_path: str = os.path.join(
|
|
"/mnt/data/share", "sqlite_dbs", "artist_genre_map.db"
|
|
)
|
|
self.album_art_db_path: str = os.path.join(
|
|
"/mnt/data/share", "sqlite_dbs", "track_album_art.db"
|
|
)
|
|
self.playback_genres: list[str] = [
|
|
# "post-hardcore",
|
|
# "post hardcore",
|
|
# "metalcore",
|
|
# "deathcore",
|
|
# "edm",
|
|
# "electronic",
|
|
# "hard rock",
|
|
]
|
|
self.active_playlist: list[dict] = []
|
|
self.playlist_loaded: bool = False
|
|
self.now_playing: dict = {
|
|
"artist": "N/A",
|
|
"song": "N/A",
|
|
"album": "N/A",
|
|
"genre": "N/A",
|
|
"artistsong": "N/A - N/A",
|
|
"duration": 0,
|
|
"start": 0,
|
|
"end": 0,
|
|
"file_path": None,
|
|
"id": None,
|
|
}
|
|
self.webhooks: dict = {
|
|
"gpt": {
|
|
"hook": self.constants.GPT_WEBHOOK,
|
|
},
|
|
"sfm": {
|
|
"hook": self.constants.SFM_WEBHOOK,
|
|
},
|
|
}
|
|
|
|
def duration_conv(self, s: Union[int, float]) -> str:
|
|
"""
|
|
Convert duration given in seconds to hours, minutes, and seconds (h:m:s)
|
|
Args:
|
|
s (Union[int, float]): seconds to convert
|
|
Returns:
|
|
str
|
|
"""
|
|
return str(datetime.timedelta(seconds=s)).split(".", maxsplit=1)[0]
|
|
|
|
def trackdb_typeahead(self, query: str) -> Optional[list[str]]:
|
|
"""
|
|
Query track db for typeahead
|
|
Args:
|
|
query (str): The search query
|
|
Returns:
|
|
Optional[list[str]]
|
|
"""
|
|
if not query:
|
|
return None
|
|
with sqlite3.connect(self.active_playlist_path, timeout=1) as _db:
|
|
_db.row_factory = sqlite3.Row
|
|
db_query: str = """SELECT DISTINCT(LOWER(TRIM(artist) || " - " || TRIM(song))),\
|
|
(TRIM(artist) || " - " || TRIM(song)) as artistsong FROM tracks WHERE\
|
|
artistsong LIKE ? LIMIT 30"""
|
|
db_params: tuple[str] = (f"%{query}%",)
|
|
_cursor = _db.execute(db_query, db_params)
|
|
result: Iterable[sqlite3.Row] = _cursor.fetchall()
|
|
out_result = [str(r["artistsong"]) for r in result]
|
|
return out_result
|
|
|
|
def datatables_search(self, filter: str) -> Optional[list[dict]]:
|
|
"""DataTables Search
|
|
|
|
Args:
|
|
filter (str): The filter query to fuzzy match with
|
|
|
|
Returns:
|
|
list[dict]: List of matching playlist items (if any are found)
|
|
"""
|
|
filter = filter.strip().lower()
|
|
matched: list[dict] = []
|
|
for item in self.active_playlist:
|
|
artist: str = item.get("artist", None)
|
|
song: str = item.get("song", None)
|
|
artistsong: str = item.get("artistsong", None)
|
|
album: str = item.get("album", None)
|
|
if not artist or not song or not artistsong:
|
|
continue
|
|
if non_alnum.sub("", filter) in non_alnum.sub("", artistsong).lower():
|
|
matched.append(item)
|
|
continue
|
|
if (
|
|
fuzz.ratio(filter, artist) >= 85
|
|
or fuzz.ratio(filter, song) >= 85
|
|
or fuzz.ratio(filter, album) >= 85
|
|
):
|
|
matched.append(item)
|
|
return matched
|
|
|
|
def search_playlist(
|
|
self,
|
|
artistsong: Optional[str] = None,
|
|
artist: Optional[str] = None,
|
|
song: Optional[str] = None,
|
|
) -> bool:
|
|
"""
|
|
Search for track, add it up next in play queue if found
|
|
Args:
|
|
artistsong (Optional[str]): Artist - Song combo to search [ignored if artist/song are specified]
|
|
artist (Optional[str]): Artist to search (ignored if artistsong is specified)
|
|
song (Optional[str]): Song to search (ignored if artistsong is specified)
|
|
Returns:
|
|
bool
|
|
"""
|
|
if artistsong and (artist or song):
|
|
raise RadioException("Cannot search using combination provided")
|
|
if not artistsong and (not artist or not song):
|
|
raise RadioException("No query provided")
|
|
try:
|
|
search_query: str = 'SELECT id, artist, song, (artist || " - " || song) AS artistsong, album, file_path, duration FROM tracks\
|
|
WHERE editdist3((lower(artist) || " " || lower(song)), (? || " " || ?))\
|
|
<= 410 ORDER BY editdist3((lower(artist) || " " || lower(song)), ?) ASC LIMIT 1'
|
|
if artistsong:
|
|
artistsong_split: list = artistsong.split(" - ", maxsplit=1)
|
|
(search_artist, search_song) = tuple(artistsong_split)
|
|
else:
|
|
search_artist = artist
|
|
search_song = song
|
|
if not artistsong:
|
|
artistsong = f"{search_artist} - {search_song}"
|
|
search_params = (
|
|
search_artist.lower(),
|
|
search_song.lower(),
|
|
artistsong.lower(),
|
|
)
|
|
with sqlite3.connect(self.active_playlist_path, timeout=2) as db_conn:
|
|
db_conn.enable_load_extension(True)
|
|
for ext in self.sqlite_exts:
|
|
db_conn.load_extension(ext)
|
|
db_conn.row_factory = sqlite3.Row
|
|
db_cursor = db_conn.execute(search_query, search_params)
|
|
result: Optional[sqlite3.Row | bool] = db_cursor.fetchone()
|
|
if not result or not isinstance(result, sqlite3.Row):
|
|
return False
|
|
push_obj: dict = {
|
|
"id": result["id"],
|
|
"uuid": str(uuid().hex),
|
|
"artist": double_space.sub(" ", result["artist"].strip()),
|
|
"song": double_space.sub(" ", result["song"].strip()),
|
|
"artistsong": result["artistsong"].strip(),
|
|
"genre": self.get_genre(
|
|
double_space.sub(" ", result["artist"].strip())
|
|
),
|
|
"file_path": result["file_path"],
|
|
"duration": result["duration"],
|
|
}
|
|
self.active_playlist.insert(0, push_obj)
|
|
return True
|
|
except Exception as e:
|
|
logging.critical("search_playlist:: Search error occurred: %s", str(e))
|
|
traceback.print_exc()
|
|
return False
|
|
|
|
def add_genre(self, artist: str, genre: str) -> bool:
|
|
"""
|
|
Add artist/genre pairing to DB
|
|
Args:
|
|
artist (str)
|
|
genre (str)
|
|
Returns:
|
|
bool
|
|
"""
|
|
try:
|
|
with sqlite3.connect(self.artist_genre_db_path, timeout=2) as _db:
|
|
query: str = (
|
|
"INSERT OR IGNORE INTO artist_genre (artist, genre) VALUES(?, ?)"
|
|
)
|
|
params: tuple[str, str] = (artist, genre)
|
|
res = _db.execute(query, params)
|
|
if isinstance(res.lastrowid, int):
|
|
logging.debug(
|
|
"Query executed successfully for %s/%s, committing",
|
|
artist,
|
|
genre,
|
|
)
|
|
_db.commit()
|
|
return True
|
|
logging.debug(
|
|
"Failed to store artist/genre pair: %s/%s (res: %s)", artist, genre, res
|
|
)
|
|
return False
|
|
except Exception as e:
|
|
logging.info(
|
|
"Failed to store artist/genre pair: %s/%s (%s)", artist, genre, str(e)
|
|
)
|
|
traceback.print_exc()
|
|
return False
|
|
|
|
def add_genres(self, pairs: list[dict[str, str]]) -> bool:
|
|
"""
|
|
(BATCH) Add artist/genre pairings to DB
|
|
Expects list of dicts comprised of artist name (key), genre (value)
|
|
Args:
|
|
pairs (list[dict[str, str]]): Pairs of artist/genres to add, list of dicts
|
|
Returns:
|
|
bool
|
|
"""
|
|
try:
|
|
added_rows: int = 0
|
|
with sqlite3.connect(self.artist_genre_db_path, timeout=2) as _db:
|
|
for pair in pairs:
|
|
try:
|
|
artist, genre = pair
|
|
query: str = "INSERT OR IGNORE INTO artist_genre (artist, genre) VALUES(?, ?)"
|
|
params: tuple[str, str] = (artist, genre)
|
|
res = _db.execute(query, params)
|
|
if isinstance(res.lastrowid, int):
|
|
logging.debug(
|
|
"add_genres: Query executed successfully for %s/%s",
|
|
artist,
|
|
genre,
|
|
)
|
|
added_rows += 1
|
|
else:
|
|
logging.debug(
|
|
"Failed to store artist/genre pair: %s/%s (res: %s)",
|
|
artist,
|
|
genre,
|
|
res,
|
|
)
|
|
except Exception as e:
|
|
logging.info(
|
|
"Failed to store artist/genre pair: %s/%s (%s)",
|
|
artist,
|
|
genre,
|
|
str(e),
|
|
)
|
|
continue
|
|
if added_rows:
|
|
logging.info("add_genres: Committing %s rows", added_rows)
|
|
_db.commit()
|
|
return True
|
|
logging.info("add_genres: Failed (No rows added)")
|
|
return False
|
|
except Exception as e:
|
|
logging.info("Failed to store artist/genre pairs: %s", str(e))
|
|
traceback.print_exc()
|
|
return False
|
|
|
|
def get_genre(self, artist: str) -> str:
|
|
"""
|
|
Retrieve Genre for given Artist
|
|
Args:
|
|
artist (str): The artist to query
|
|
Returns:
|
|
str
|
|
"""
|
|
try:
|
|
artist = artist.strip()
|
|
query: str = (
|
|
"SELECT genre FROM artist_genre WHERE artist LIKE ? COLLATE NOCASE"
|
|
)
|
|
params: tuple[str] = (f"%%{artist}%%",)
|
|
with sqlite3.connect(self.artist_genre_db_path, timeout=2) as _db:
|
|
_db.row_factory = sqlite3.Row
|
|
_cursor = _db.execute(query, params)
|
|
res = _cursor.fetchone()
|
|
if not res:
|
|
return "Not Found" # Exception suppressed
|
|
# raise RadioException(
|
|
# f"Could not locate {artist} in artist_genre_map db."
|
|
# )
|
|
return res["genre"]
|
|
except Exception as e:
|
|
logging.info("Failed to look up genre for artist: %s (%s)", artist, str(e))
|
|
traceback.print_exc()
|
|
return "Not Found"
|
|
|
|
def load_playlist(self) -> None:
|
|
"""Load Playlist"""
|
|
try:
|
|
logging.info("Loading playlist...")
|
|
self.active_playlist.clear()
|
|
|
|
db_query: str = self.constants.RADIO_DB_QUERY
|
|
|
|
with sqlite3.connect(
|
|
f"file:{self.active_playlist_path}?mode=ro", uri=True, timeout=15
|
|
) as db_conn:
|
|
db_conn.row_factory = sqlite3.Row
|
|
db_cursor = db_conn.execute(db_query)
|
|
results: list[sqlite3.Row] = db_cursor.fetchall()
|
|
self.active_playlist = [
|
|
{
|
|
"uuid": str(uuid().hex),
|
|
"id": r["id"],
|
|
"artist": double_space.sub(" ", r["artist"]).strip(),
|
|
"song": double_space.sub(" ", r["song"]).strip(),
|
|
"album": double_space.sub(" ", r["album"]).strip(),
|
|
"genre": self.get_genre(
|
|
double_space.sub(" ", r["artist"]).strip()
|
|
),
|
|
"artistsong": double_space.sub(
|
|
" ", r["artistdashsong"]
|
|
).strip(),
|
|
"file_path": r["file_path"],
|
|
"duration": r["duration"],
|
|
}
|
|
for r in results
|
|
if r not in self.active_playlist
|
|
]
|
|
logging.info(
|
|
"Populated active playlists with %s items",
|
|
len(self.active_playlist),
|
|
)
|
|
|
|
random.shuffle(self.active_playlist)
|
|
|
|
"""Dedupe"""
|
|
logging.info("Removing duplicate tracks...")
|
|
dedupe_processed = []
|
|
for item in self.active_playlist:
|
|
artistsongabc: str = non_alnum.sub("", item.get("artistsong", None))
|
|
if not artistsongabc:
|
|
logging.info("Missing artistsong: %s", item)
|
|
continue
|
|
if artistsongabc in dedupe_processed:
|
|
self.active_playlist.remove(item)
|
|
dedupe_processed.append(artistsongabc)
|
|
|
|
logging.info(
|
|
"Duplicates removed.New playlist size: %s",
|
|
len(self.active_playlist),
|
|
)
|
|
|
|
logging.info(
|
|
"Playlist: %s",
|
|
[str(a.get("artistsong", "")) for a in self.active_playlist],
|
|
)
|
|
|
|
if self.playback_genres:
|
|
new_playlist: list[dict] = []
|
|
logging.info("Limiting playback genres")
|
|
for item in self.active_playlist:
|
|
matched_genre: bool = False
|
|
item_genres: str = item.get("genre", "").strip().lower()
|
|
for genre in self.playback_genres:
|
|
genre = genre.strip().lower()
|
|
if genre in item_genres:
|
|
if item in new_playlist:
|
|
continue
|
|
new_playlist.append(item)
|
|
matched_genre = True
|
|
continue
|
|
if matched_genre:
|
|
continue
|
|
self.active_playlist = new_playlist
|
|
logging.info(
|
|
"%s items remain for playback after filtering",
|
|
len(self.active_playlist),
|
|
)
|
|
self.playlist_loaded = True
|
|
self.loop.run_until_complete(self._ls_skip())
|
|
except Exception as e:
|
|
logging.info("Playlist load failed: %s", str(e))
|
|
traceback.print_exc()
|
|
|
|
def cache_album_art(self, track_id: int, file_path: str) -> None:
|
|
"""
|
|
Cache Album Art to SQLite DB
|
|
Args:
|
|
track_id (int): Track ID to update
|
|
file_path (str): Path to file, for artwork extraction
|
|
Returns:
|
|
None
|
|
"""
|
|
try:
|
|
logging.info(
|
|
"cache_album_art: Attempting to store album art for track_id: %s",
|
|
track_id,
|
|
)
|
|
tagger = music_tag.load_file(file_path)
|
|
album_art = tagger["artwork"].first.data
|
|
with sqlite3.connect(self.album_art_db_path, timeout=2) as db_conn:
|
|
db_cursor = db_conn.execute(
|
|
"INSERT OR IGNORE INTO album_art (track_id, album_art) VALUES(?, ?)",
|
|
(
|
|
track_id,
|
|
album_art,
|
|
),
|
|
)
|
|
if isinstance(db_cursor.lastrowid, int):
|
|
db_conn.commit()
|
|
else:
|
|
logging.debug(
|
|
"No row inserted for track_id: %s w/ file_path: %s",
|
|
track_id,
|
|
file_path,
|
|
)
|
|
except Exception as e:
|
|
logging.debug("cache_album_art Exception: %s", str(e))
|
|
traceback.print_exc()
|
|
|
|
def get_album_art(self, track_id: int) -> Optional[bytes]:
|
|
"""
|
|
Get Album Art
|
|
Args:
|
|
track_id (int): Track ID to query
|
|
Returns:
|
|
Optional[bytes]
|
|
"""
|
|
try:
|
|
with sqlite3.connect(self.album_art_db_path, timeout=2) as db_conn:
|
|
db_conn.row_factory = sqlite3.Row
|
|
query: str = "SELECT album_art FROM album_art WHERE track_id = ?"
|
|
query_params: tuple[int] = (track_id,)
|
|
|
|
db_cursor = db_conn.execute(query, query_params)
|
|
result: Optional[Union[sqlite3.Row, bool]] = db_cursor.fetchone()
|
|
if not result or not isinstance(result, sqlite3.Row):
|
|
return None
|
|
return result["album_art"]
|
|
except Exception as e:
|
|
logging.debug("get_album_art Exception: %s", str(e))
|
|
traceback.print_exc()
|
|
return None
|
|
|
|
def get_queue_item_by_uuid(self, _uuid: str) -> Optional[tuple[int, dict]]:
|
|
"""
|
|
Get queue item by UUID
|
|
Args:
|
|
uuid: The UUID to search
|
|
Returns:
|
|
Optional[tuple[int, dict]]
|
|
"""
|
|
for x, item in enumerate(self.active_playlist):
|
|
if item.get("uuid") == _uuid:
|
|
return (x, item)
|
|
return None
|
|
|
|
async def _ls_skip(self) -> bool:
|
|
"""
|
|
Ask LiquidSoap server to skip to the next track
|
|
Args:
|
|
None
|
|
Returns:
|
|
bool
|
|
"""
|
|
try:
|
|
async with ClientSession() as session:
|
|
async with session.get(
|
|
f"{self.ls_uri}/next", timeout=ClientTimeout(connect=2, sock_read=2)
|
|
) as request:
|
|
request.raise_for_status()
|
|
text: Optional[str] = await request.text()
|
|
return text == "OK"
|
|
except Exception as e:
|
|
logging.debug("Skip failed: %s", str(e))
|
|
|
|
return False # failsafe
|
|
|
|
async def get_ai_song_info(self, artist: str, song: str) -> Optional[str]:
|
|
"""
|
|
Get AI Song Info
|
|
Args:
|
|
artist (str)
|
|
song (str)
|
|
Returns:
|
|
Optional[str]
|
|
"""
|
|
prompt: str = f" am going to listen to {song} by {artist}."
|
|
response: Optional[str] = await self.gpt.get_completion(prompt)
|
|
if not response:
|
|
logging.critical("No response received from GPT?")
|
|
return None
|
|
return response
|
|
|
|
async def webhook_song_change(self, track: dict) -> None:
|
|
"""
|
|
Handles Song Change Outbounds (Webhooks)
|
|
Args:
|
|
track (dict)
|
|
Returns:
|
|
None
|
|
"""
|
|
try:
|
|
# First, send track info
|
|
"""
|
|
TODO:
|
|
Review friendly_track_start and friendly_track_end, not currently in use
|
|
"""
|
|
# friendly_track_start: str = time.strftime(
|
|
# "%Y-%m-%d %H:%M:%S", time.localtime(track["start"])
|
|
# )
|
|
# friendly_track_end: str = time.strftime(
|
|
# "%Y-%m-%d %H:%M:%S", time.localtime(track["end"])
|
|
# )
|
|
hook_data: dict = {
|
|
"username": "serious.FM",
|
|
"embeds": [
|
|
{
|
|
"title": "Now Playing",
|
|
"description": f"## {track['song']}\nby\n## {track['artist']}",
|
|
"color": 0x30C56F,
|
|
"thumbnail": {
|
|
"url": f"https://api.codey.lol/radio/album_art?track_id={track['id']}&{int(time.time())}",
|
|
},
|
|
"fields": [
|
|
{
|
|
"name": "Duration",
|
|
"value": self.duration_conv(track["duration"]),
|
|
"inline": True,
|
|
},
|
|
{
|
|
"name": "Genre",
|
|
"value": (
|
|
track["genre"] if track["genre"] else "Unknown"
|
|
),
|
|
"inline": True,
|
|
},
|
|
{
|
|
"name": "Filetype",
|
|
"value": track["file_path"].rsplit(".", maxsplit=1)[1],
|
|
"inline": True,
|
|
},
|
|
{
|
|
"name": "Higher Res",
|
|
"value": "[stream/icecast](https://stream.codey.lol/sfm.ogg) | [web player](https://codey.lol/radio)",
|
|
"inline": True,
|
|
},
|
|
{
|
|
"name": "Album",
|
|
"value": (
|
|
track["album"]
|
|
if "album" in track.keys()
|
|
else "Unknown"
|
|
),
|
|
},
|
|
],
|
|
}
|
|
],
|
|
}
|
|
|
|
sfm_hook: str = self.webhooks["sfm"].get("hook")
|
|
async with ClientSession() as session:
|
|
async with await session.post(
|
|
sfm_hook,
|
|
json=hook_data,
|
|
timeout=ClientTimeout(connect=5, sock_read=5),
|
|
headers={
|
|
"content-type": "application/json; charset=utf-8",
|
|
},
|
|
) as request:
|
|
request.raise_for_status()
|
|
|
|
# Next, AI feedback
|
|
|
|
ai_response: Optional[str] = await self.get_ai_song_info(
|
|
track["artist"], track["song"]
|
|
)
|
|
if not ai_response:
|
|
return
|
|
|
|
hook_data = {
|
|
"username": "GPT",
|
|
"embeds": [
|
|
{
|
|
"title": "AI Feedback",
|
|
"color": 0x35D0FF,
|
|
"description": ai_response.strip(),
|
|
}
|
|
],
|
|
}
|
|
|
|
ai_hook: str = self.webhooks["gpt"].get("hook")
|
|
async with ClientSession() as session:
|
|
async with await session.post(
|
|
ai_hook,
|
|
json=hook_data,
|
|
timeout=ClientTimeout(connect=5, sock_read=5),
|
|
headers={
|
|
"content-type": "application/json; charset=utf-8",
|
|
},
|
|
) as request:
|
|
request.raise_for_status()
|
|
except Exception as e:
|
|
logging.info("Webhook error occurred: %s", str(e))
|
|
traceback.print_exc()
|