Files
api/utils/radio_util.py

802 lines
33 KiB
Python
Raw Normal View History

2025-02-10 20:29:57 -05:00
import logging
import traceback
import time
import datetime
2025-02-11 20:01:07 -05:00
import os
import random
import asyncio
from uuid import uuid4 as uuid
from typing import Union, Optional, Iterable
2025-02-11 11:26:20 -05:00
from aiohttp import ClientSession, ClientTimeout
import regex
from regex import Pattern
import sqlite3
import gpt
import music_tag # type: ignore
2025-04-26 17:17:42 -04:00
from rapidfuzz import fuzz
2025-03-04 08:11:55 -05:00
from endpoints.constructors import RadioException
import redis.asyncio as redis
from redis.commands.search.query import Query # noqa
from redis.commands.search.indexDefinition import IndexDefinition, IndexType # noqa
from redis.commands.search.field import TextField # noqa
from redis.commands.json.path import Path # noqa
from lyric_search.sources import private
2025-02-11 20:01:07 -05:00
double_space: Pattern = regex.compile(r"\s{2,}")
2025-04-26 17:17:42 -04:00
non_alnum: Pattern = regex.compile(r"[^a-zA-Z0-9]")
2025-02-10 20:29:57 -05:00
class RadioUtil:
2025-02-15 21:09:33 -05:00
"""
Radio Utils
"""
2025-04-26 19:47:12 -04:00
def __init__(self, constants, loop) -> None:
2025-02-10 20:29:57 -05:00
self.constants = constants
2025-04-26 19:47:12 -04:00
self.loop = loop
2025-02-10 20:29:57 -05:00
self.gpt = gpt.GPT(self.constants)
self.ls_uri: str = self.constants.LS_URI
self.redis_client = redis.Redis(password=private.REDIS_PW)
self.DEDUPE_PLAYLISTS: bool = True
self.sqlite_exts: list[str] = [
2025-05-27 16:48:28 -04:00
"/home/kyle/api/solibs/spellfix1.cpython-311-x86_64-linux-gnu.so"
]
2025-07-17 06:55:16 -04:00
self.playback_db_path: str = os.path.join(
2025-06-08 08:53:18 -04:00
"/usr/local/share", "sqlite_dbs", "track_file_map.db"
)
self.artist_genre_db_path: str = os.path.join(
2025-06-08 08:53:18 -04:00
"/usr/local/share", "sqlite_dbs", "artist_genre_map.db"
)
self.album_art_db_path: str = os.path.join(
2025-06-08 08:53:18 -04:00
"/usr/local/share", "sqlite_dbs", "track_album_art.db"
)
2025-07-17 06:55:16 -04:00
self.db_queries = {
'main': self.constants.RADIO_DB_QUERY,
'rap': self.constants.RADIO_DB_QUERY_RAP,
'pop': self.constants.RADIO_DB_QUERY_POP,
2025-08-15 13:31:15 -04:00
# 'classical': self.constants.RADIO_DB_QUERY_CLASSICAL,
2025-07-17 06:55:16 -04:00
'rock': self.constants.RADIO_DB_QUERY_ROCK,
'electronic': self.constants.RADIO_DB_QUERY_ELECTRONIC,
}
2025-04-22 15:31:26 -04:00
self.playback_genres: list[str] = [
2025-07-15 11:39:12 -04:00
# "metal",
# # "hip hop",
# "metalcore",
# "deathcore",
# "edm",
2025-07-15 11:39:12 -04:00
# "electronic",
# "post-hardcore",
# "post hardcore",
# # "hard rock",
# # "rock",
# # # "ska",
2025-07-15 11:39:12 -04:00
# # "post punk",
# # "post-punk",
# # "pop punk",
# # "pop-punk",
2025-04-22 15:31:26 -04:00
]
self.playlists: list = [
"main",
"rock",
"rap",
"electronic",
"pop",
]
2025-07-17 06:55:16 -04:00
self.active_playlist: dict[str, list[dict]] = {}
self.playlists_loaded: bool = False
self.now_playing: dict[str, dict] = {
playlist: {
2025-07-17 06:55:16 -04:00
"artist": "N/A",
"song": "N/A",
"album": "N/A",
"genre": "N/A",
"artistsong": "N/A - N/A",
"duration": 0,
"start": 0,
"end": 0,
"file_path": None,
"id": None,
} for playlist in self.playlists
}
2025-02-11 20:01:07 -05:00
self.webhooks: dict = {
"gpt": {
"hook": self.constants.GPT_WEBHOOK,
"lastRun": None,
},
"sfm": {
"hook": self.constants.SFM_WEBHOOK,
"lastRun": None,
},
}
def duration_conv(self, s: Union[int, float]) -> str:
2025-02-10 20:29:57 -05:00
"""
Convert duration given in seconds to hours, minutes, and seconds (h:m:s)
Args:
2025-02-15 21:09:33 -05:00
s (Union[int, float]): seconds to convert
2025-02-10 20:29:57 -05:00
Returns:
str
"""
return str(datetime.timedelta(seconds=s)).split(".", maxsplit=1)[0]
2025-04-26 21:27:55 -04:00
def trackdb_typeahead(self, query: str) -> Optional[list[str]]:
"""
Query track db for typeahead
Args:
query (str): The search query
Returns:
Optional[list[str]]
"""
2025-02-16 13:54:28 -05:00
if not query:
return None
2025-07-17 06:55:16 -04:00
with sqlite3.connect(self.playback_db_path, timeout=1) as _db:
2025-02-16 13:54:28 -05:00
_db.row_factory = sqlite3.Row
2025-02-18 06:55:47 -05:00
db_query: str = """SELECT DISTINCT(LOWER(TRIM(artist) || " - " || TRIM(song))),\
(TRIM(artist) || " - " || TRIM(song)) as artistsong FROM tracks WHERE\
artistsong LIKE ? LIMIT 30"""
2025-02-16 13:54:28 -05:00
db_params: tuple[str] = (f"%{query}%",)
_cursor = _db.execute(db_query, db_params)
result: Iterable[sqlite3.Row] = _cursor.fetchall()
out_result = [str(r["artistsong"]) for r in result]
return out_result
2025-07-17 06:55:16 -04:00
def datatables_search(self,
filter: str,
station: str = "main") -> Optional[list[dict]]:
2025-04-26 17:17:42 -04:00
"""DataTables Search
Args:
filter (str): The filter query to fuzzy match with
Returns:
list[dict]: List of matching playlist items (if any are found)
2025-04-26 17:17:42 -04:00
"""
filter = filter.strip().lower()
matched: list[dict] = []
2025-07-17 06:55:16 -04:00
for item in self.active_playlist[station]:
2025-08-15 13:31:15 -04:00
artist: str = item.get("artist", "")
song: str = item.get("song", "")
artistsong: str = item.get("artistsong", "")
album: str = item.get("album", "")
2025-04-26 17:17:42 -04:00
if not artist or not song or not artistsong:
continue
if non_alnum.sub("", filter) in non_alnum.sub("", artistsong).lower():
matched.append(item)
continue
if (
fuzz.ratio(filter, artist) >= 85
or fuzz.ratio(filter, song) >= 85
or fuzz.ratio(filter, album) >= 85
):
matched.append(item)
return matched
2025-07-17 06:55:16 -04:00
def search_db(
self,
artistsong: Optional[str] = None,
artist: Optional[str] = None,
song: Optional[str] = None,
2025-07-17 06:55:16 -04:00
station: str = "main"
) -> bool:
2025-02-12 07:53:22 -05:00
"""
Search for track, add it up next in play queue if found
Args:
artistsong (Optional[str]): Artist - Song combo to search [ignored if artist/song are specified]
artist (Optional[str]): Artist to search (ignored if artistsong is specified)
song (Optional[str]): Song to search (ignored if artistsong is specified)
Returns:
bool
"""
2025-02-11 20:01:07 -05:00
if artistsong and (artist or song):
raise RadioException("Cannot search using combination provided")
if not artistsong and (not artist or not song):
raise RadioException("No query provided")
try:
2025-05-17 08:07:38 -04:00
search_query: str = 'SELECT id, artist, song, (artist || " - " || song) AS artistsong, album, file_path, duration FROM tracks\
2025-02-11 20:01:07 -05:00
WHERE editdist3((lower(artist) || " " || lower(song)), (? || " " || ?))\
<= 410 ORDER BY editdist3((lower(artist) || " " || lower(song)), ?) ASC LIMIT 1'
if artistsong:
artistsong_split: list = artistsong.split(" - ", maxsplit=1)
(search_artist, search_song) = tuple(artistsong_split)
else:
2025-02-14 16:07:24 -05:00
search_artist = artist
search_song = song
2025-02-11 20:01:07 -05:00
if not artistsong:
2025-02-14 16:07:24 -05:00
artistsong = f"{search_artist} - {search_song}"
2025-08-15 13:31:15 -04:00
if not search_artist or not search_song or not artistsong:
raise RadioException("No query provided")
search_params = (
search_artist.lower(),
search_song.lower(),
artistsong.lower(),
)
2025-07-17 06:55:16 -04:00
with sqlite3.connect(self.playback_db_path, timeout=2) as db_conn:
db_conn.enable_load_extension(True)
2025-02-11 20:01:07 -05:00
for ext in self.sqlite_exts:
db_conn.load_extension(ext)
2025-02-11 20:01:07 -05:00
db_conn.row_factory = sqlite3.Row
2025-04-26 17:17:42 -04:00
db_cursor = db_conn.execute(search_query, search_params)
result: Optional[sqlite3.Row | bool] = db_cursor.fetchone()
if not result or not isinstance(result, sqlite3.Row):
return False
push_obj: dict = {
"id": result["id"],
"uuid": str(uuid().hex),
"artist": double_space.sub(" ", result["artist"].strip()),
"song": double_space.sub(" ", result["song"].strip()),
"artistsong": result["artistsong"].strip(),
2025-09-23 13:17:34 -04:00
"album": result["album"].strip() if result["album"] else "N/A",
"genre": self.get_genre(
double_space.sub(" ", result["artist"].strip())
),
"file_path": result["file_path"],
"duration": result["duration"],
}
2025-07-17 06:55:16 -04:00
self.active_playlist[station].insert(0, push_obj)
return True
2025-02-11 20:01:07 -05:00
except Exception as e:
2025-07-17 06:55:16 -04:00
logging.critical("search_db:: Search error occurred: %s", str(e))
2025-02-11 20:01:07 -05:00
traceback.print_exc()
return False
2025-04-26 17:17:42 -04:00
def add_genre(self, artist: str, genre: str) -> bool:
"""
Add artist/genre pairing to DB
Args:
artist (str)
genre (str)
Returns:
bool
"""
try:
with sqlite3.connect(self.artist_genre_db_path, timeout=2) as _db:
2025-04-22 15:31:26 -04:00
query: str = (
"INSERT OR IGNORE INTO artist_genre (artist, genre) VALUES(?, ?)"
)
params: tuple[str, str] = (artist, genre)
res = _db.execute(query, params)
if isinstance(res.lastrowid, int):
2025-04-22 15:31:26 -04:00
logging.debug(
"Query executed successfully for %s/%s, committing",
artist,
genre,
)
_db.commit()
return True
2025-04-22 15:31:26 -04:00
logging.debug(
"Failed to store artist/genre pair: %s/%s (res: %s)", artist, genre, res
)
return False
except Exception as e:
2025-04-22 15:31:26 -04:00
logging.info(
"Failed to store artist/genre pair: %s/%s (%s)", artist, genre, str(e)
)
traceback.print_exc()
return False
2025-04-22 15:31:26 -04:00
2025-04-26 17:17:42 -04:00
def add_genres(self, pairs: list[dict[str, str]]) -> bool:
"""
(BATCH) Add artist/genre pairings to DB
Expects list of dicts comprised of artist name (key), genre (value)
Args:
pairs (list[dict[str, str]]): Pairs of artist/genres to add, list of dicts
Returns:
bool
"""
try:
added_rows: int = 0
2025-08-15 13:31:15 -04:00
artist = None
genre = None
with sqlite3.connect(self.artist_genre_db_path, timeout=2) as _db:
for pair in pairs:
try:
artist, genre = pair
2025-05-17 08:07:38 -04:00
query: str = "INSERT OR IGNORE INTO artist_genre (artist, genre) VALUES(?, ?)"
params: tuple[str, str] = (artist, genre)
res = _db.execute(query, params)
if isinstance(res.lastrowid, int):
2025-04-22 15:31:26 -04:00
logging.debug(
"add_genres: Query executed successfully for %s/%s",
artist,
genre,
)
added_rows += 1
else:
2025-04-22 15:31:26 -04:00
logging.debug(
"Failed to store artist/genre pair: %s/%s (res: %s)",
artist,
genre,
res,
)
except Exception as e:
2025-04-22 15:31:26 -04:00
logging.info(
"Failed to store artist/genre pair: %s/%s (%s)",
artist,
genre,
str(e),
)
continue
if added_rows:
logging.info("add_genres: Committing %s rows", added_rows)
_db.commit()
return True
logging.info("add_genres: Failed (No rows added)")
return False
except Exception as e:
2025-04-22 15:31:26 -04:00
logging.info("Failed to store artist/genre pairs: %s", str(e))
traceback.print_exc()
return False
2025-07-01 10:34:03 -04:00
def get_genres(self, input_artists: list[str]) -> dict:
"""
Retrieve genres for given list of artists
Batch equivalent of get_genre
Args:
input_artists (list): The artists to query
Returns:
dict[str, str]
"""
time_start: float = time.time()
artist_genre: dict[str, str] = {}
query: str = (
2025-10-07 12:07:45 -04:00
"SELECT REPLACE(GROUP_CONCAT(DISTINCT g.name), ',', ', ') AS genre FROM artists a "
"JOIN artist_genres ag ON a.id = ag.artist_id "
"JOIN genres g ON ag.genre_id = g.id "
"WHERE a.name LIKE ? COLLATE NOCASE"
2025-07-01 10:34:03 -04:00
)
with sqlite3.connect(self.artist_genre_db_path) as _db:
_db.row_factory = sqlite3.Row
for artist in input_artists:
params: tuple[str] = (f"%%{artist}%%",)
_cursor = _db.execute(query, params)
res = _cursor.fetchone()
2025-10-07 12:07:45 -04:00
if not res or not res["genre"]:
2025-07-01 10:34:03 -04:00
artist_genre[artist] = "N/A"
continue
2025-07-17 06:55:16 -04:00
artist_genre[artist] = res["genre"]
2025-07-01 10:34:03 -04:00
time_end: float = time.time()
logging.info(f"Time taken: {time_end - time_start}")
return artist_genre
def get_genre(self, artist: str) -> str:
"""
Retrieve Genre for given Artist
Args:
artist (str): The artist to query
Returns:
str
"""
try:
artist = artist.strip()
2025-04-22 15:49:32 -04:00
query: str = (
2025-10-07 12:07:45 -04:00
"SELECT REPLACE(GROUP_CONCAT(DISTINCT g.name), ',', ', ') AS genre FROM artists a "
"JOIN artist_genres ag ON a.id = ag.artist_id "
"JOIN genres g ON ag.genre_id = g.id "
"WHERE a.name LIKE ? COLLATE NOCASE"
2025-04-22 15:49:32 -04:00
)
params: tuple[str] = (artist,)
with sqlite3.connect(self.playback_db_path, timeout=2) as _db:
_db.row_factory = sqlite3.Row
_cursor = _db.execute(query, params)
res = _cursor.fetchone()
2025-10-07 12:07:45 -04:00
if not res or not res["genre"]:
return "Not Found" # Exception suppressed
# raise RadioException(
# f"Could not locate {artist} in artist_genre_map db."
# )
return res["genre"]
except Exception as e:
logging.info("Failed to look up genre for artist: %s (%s)", artist, str(e))
traceback.print_exc()
return "Not Found"
async def load_playlists(self) -> None:
2025-07-17 06:55:16 -04:00
"""Load Playlists"""
2025-02-11 20:01:07 -05:00
try:
2025-07-17 06:55:16 -04:00
logging.info("Loading playlists...")
if isinstance(self.active_playlist, dict):
self.active_playlist.clear()
for playlist in self.playlists:
playlist_redis_key: str = f"playlist:{playlist}"
2025-08-15 13:31:15 -04:00
_playlist = await self.redis_client.json().get(playlist_redis_key) # type: ignore
# Ensure we always have a list to work with
if not _playlist:
logging.warning("No playlist found in redis for %s, skipping", playlist)
self.active_playlist[playlist] = []
continue
# Make sure playlist key exists
if playlist not in self.active_playlist.keys():
self.active_playlist[playlist] = []
# Shuffle a copy so we don't mutate the underlying redis object
try:
shuffled = list(_playlist)
random.shuffle(shuffled)
except Exception:
shuffled = _playlist
# Build a fresh list rather than modifying in-place (prevents duplication)
built: list[dict] = []
for r in shuffled:
try:
item = {
"uuid": str(uuid().hex),
"id": r.get("id"),
"artist": double_space.sub(" ", (r.get("artist") or "")).strip(),
"song": double_space.sub(" ", (r.get("song") or "")).strip(),
"album": double_space.sub(" ", (r.get("album") or "")).strip(),
"genre": r.get("genre") if r.get("genre") else "Not Found",
"artistsong": double_space.sub(" ", (r.get("artistdashsong") or "")).strip(),
"file_path": r.get("file_path"),
"duration": r.get("duration"),
}
built.append(item)
except Exception:
logging.debug("Skipping malformed playlist entry for %s: %s", playlist, r)
self.active_playlist[playlist] = built
logging.info(
2025-07-17 06:55:16 -04:00
"Populated playlist: %s with %s items",
playlist, len(self.active_playlist[playlist]),
2025-07-17 06:55:16 -04:00
)
"""Dedupe"""
if self.DEDUPE_PLAYLISTS:
logging.info("Removing duplicate tracks (by file_path only)...")
dedupe_processed: set[str] = set()
deduped_list: list[dict] = []
for item in self.active_playlist[playlist]:
fp = item.get("file_path")
if not fp:
# If no file_path available, skip the item (can't dedupe reliably)
logging.info("Skipping item without file_path during dedupe: %s", item)
continue
key = fp
if key in dedupe_processed:
continue
dedupe_processed.add(key)
deduped_list.append(item)
self.active_playlist[playlist] = deduped_list
else:
logging.warning("Dupe removal disabled")
2025-04-26 22:01:25 -04:00
logging.info(
"Duplicates for playlist: %s removed. New playlist size: %s",
playlist, len(self.active_playlist[playlist]),
)
2025-04-26 21:27:55 -04:00
if playlist == 'main' and self.playback_genres:
new_playlist: list[dict] = []
logging.info("Limiting playback genres")
for item in self.active_playlist[playlist]:
item_genres = item.get("genre", "").strip().lower()
# Check if any genre matches and item isn't already in new_playlist
if any(genre.strip().lower() in item_genres for genre in self.playback_genres):
if item not in new_playlist:
new_playlist.append(item)
self.active_playlist[playlist] = new_playlist
logging.info(
"%s items for playlist: %s remain for playback after filtering",
playlist, len(self.active_playlist[playlist]),
)
2025-07-17 06:55:16 -04:00
"""Loading Complete"""
# Request skip from LS to bring streams current
for playlist in self.playlists:
logging.info("Skipping: %s", playlist)
await self._ls_skip(playlist)
2025-07-17 06:55:16 -04:00
self.playlists_loaded = True
except Exception as e:
logging.info("Playlist load failed: %s", str(e))
2025-02-11 20:01:07 -05:00
traceback.print_exc()
2025-04-26 19:47:12 -04:00
def cache_album_art(self, track_id: int, file_path: str) -> None:
2025-02-12 07:53:22 -05:00
"""
2025-10-07 12:07:45 -04:00
Cache Album Art to SQLite DB - IMPROVED VERSION
2025-02-12 07:53:22 -05:00
Args:
track_id (int): Track ID to update
file_path (str): Path to file, for artwork extraction
2025-02-12 07:53:22 -05:00
Returns:
None
"""
try:
2025-10-07 12:07:45 -04:00
# Validate file exists first
if not os.path.exists(file_path):
logging.warning("cache_album_art: File not found: %s", file_path)
return
logging.info("cache_album_art: Attempting to store album art for track_id: %s", track_id)
# Check if artwork already exists to avoid duplicates
with sqlite3.connect(self.album_art_db_path, timeout=5) as db_conn:
db_conn.row_factory = sqlite3.Row
cursor = db_conn.execute("SELECT track_id FROM album_art WHERE track_id = ?", (track_id,))
if cursor.fetchone():
logging.debug("cache_album_art: Track %s already has album art", track_id)
return
# Load file with better error handling
try:
tagger = music_tag.load_file(file_path)
except Exception as e:
logging.warning("cache_album_art: Failed to load file %s: %s", file_path, e)
return
# Extract artwork with validation
album_art = None
try:
if not tagger:
logging.debug("cache_album_art: No tagger available for track %s", track_id)
return
artwork_field = tagger["artwork"]
if artwork_field and hasattr(artwork_field, 'first') and artwork_field.first:
first_artwork = artwork_field.first
if hasattr(first_artwork, 'data') and first_artwork.data:
potential_art = first_artwork.data
# Validate artwork data
if isinstance(potential_art, bytes) and len(potential_art) > 100:
# Check if it looks like valid image data
if (potential_art.startswith(b'\xff\xd8') or # JPEG
potential_art.startswith(b'\x89PNG') or # PNG
potential_art.startswith(b'GIF87a') or # GIF87a
potential_art.startswith(b'GIF89a') or # GIF89a
potential_art.startswith(b'RIFF')): # WEBP/other RIFF
album_art = potential_art
logging.debug("cache_album_art: Found valid artwork (%s bytes)", len(album_art))
else:
logging.warning("cache_album_art: Invalid artwork format for track %s - not caching", track_id)
return
else:
logging.debug("cache_album_art: No valid artwork data for track %s", track_id)
return
else:
logging.debug("cache_album_art: No artwork data available for track %s", track_id)
return
else:
2025-10-07 12:07:45 -04:00
logging.debug("cache_album_art: No artwork field for track %s", track_id)
return
except Exception as e:
logging.warning("cache_album_art: Error extracting artwork for track %s: %s", track_id, e)
return
# Only proceed if we have valid artwork
if not album_art:
logging.debug("cache_album_art: No valid artwork to cache for track %s", track_id)
return
# Insert into database
try:
with sqlite3.connect(self.album_art_db_path, timeout=5) as db_conn:
cursor = db_conn.execute(
"INSERT OR IGNORE INTO album_art (track_id, album_art) VALUES (?, ?)",
(track_id, album_art)
2025-04-26 17:17:42 -04:00
)
2025-10-07 12:07:45 -04:00
if cursor.rowcount == 1:
db_conn.commit()
logging.info("cache_album_art: Successfully cached %s bytes for track %s", len(album_art), track_id)
else:
logging.debug("cache_album_art: No row inserted for track_id: %s (may already exist)", track_id)
except Exception as e:
logging.error("cache_album_art: Database error for track %s: %s", track_id, e)
except Exception as e:
2025-10-07 12:07:45 -04:00
logging.error("cache_album_art: Unexpected error for track %s: %s", track_id, e)
traceback.print_exc()
2025-04-26 19:47:12 -04:00
def get_album_art(self, track_id: int) -> Optional[bytes]:
2025-02-12 07:53:22 -05:00
"""
Get Album Art
Args:
track_id (int): Track ID to query
2025-02-12 07:53:22 -05:00
Returns:
Optional[bytes]
2025-02-12 07:53:22 -05:00
"""
try:
with sqlite3.connect(self.album_art_db_path, timeout=2) as db_conn:
db_conn.row_factory = sqlite3.Row
query: str = "SELECT album_art FROM album_art WHERE track_id = ?"
query_params: tuple[int] = (track_id,)
db_cursor = db_conn.execute(query, query_params)
2025-04-26 17:17:42 -04:00
result: Optional[Union[sqlite3.Row, bool]] = db_cursor.fetchone()
if not result or not isinstance(result, sqlite3.Row):
return None
return result["album_art"]
except Exception as e:
2025-04-26 17:17:42 -04:00
logging.debug("get_album_art Exception: %s", str(e))
traceback.print_exc()
return None
2025-07-17 06:55:16 -04:00
def get_queue_item_by_uuid(self,
_uuid: str,
station: str = "main") -> Optional[tuple[int, dict]]:
2025-02-11 20:01:07 -05:00
"""
Get queue item by UUID
Args:
uuid: The UUID to search
Returns:
2025-02-15 21:09:33 -05:00
Optional[tuple[int, dict]]
2025-02-11 20:01:07 -05:00
"""
2025-07-17 06:55:16 -04:00
for x, item in enumerate(self.active_playlist[station]):
if item.get("uuid") == _uuid:
2025-02-11 20:01:07 -05:00
return (x, item)
return None
async def _ls_skip(self, station: str = "main") -> bool:
2025-02-12 07:53:22 -05:00
"""
Ask LiquidSoap server to skip to the next track
Args:
station (str): default "main"
2025-02-12 07:53:22 -05:00
Returns:
bool
"""
2025-02-11 20:01:07 -05:00
try:
async with ClientSession() as session:
async with session.post(
f"{self.ls_uri}/next",
data=station,
timeout=ClientTimeout(connect=2, sock_read=2)
) as request:
request.raise_for_status()
text: Optional[str] = await request.text()
return isinstance(text, str) and text.startswith("OK")
2025-02-11 20:01:07 -05:00
except Exception as e:
logging.critical("Skip failed: %s", str(e))
return False # failsafe
async def get_ai_song_info(self, artist: str, song: str) -> Optional[str]:
2025-02-10 20:29:57 -05:00
"""
Get AI Song Info
Args:
artist (str)
song (str)
Returns:
2025-02-15 21:09:33 -05:00
Optional[str]
2025-02-10 20:29:57 -05:00
"""
2025-02-15 21:09:33 -05:00
prompt: str = f" am going to listen to {song} by {artist}."
response: Optional[str] = await self.gpt.get_completion(prompt)
2025-02-10 20:29:57 -05:00
if not response:
logging.critical("No response received from GPT?")
2025-02-14 16:07:24 -05:00
return None
2025-02-10 20:29:57 -05:00
return response
async def webhook_song_change(self, track: dict, station: str = "main") -> None:
"""
Handles Song Change Outbounds (Webhooks)
Args:
track (dict)
station (str): default "main"
Returns:
None
"""
2025-02-10 20:29:57 -05:00
try:
"""TEMP - ONLY MAIN"""
if not station == "main":
return
return # Temp disable global
# First, send track info
2025-02-10 20:29:57 -05:00
"""
TODO:
Review friendly_track_start and friendly_track_end, not currently in use
2025-02-10 20:29:57 -05:00
"""
# friendly_track_start: str = time.strftime(
# "%Y-%m-%d %H:%M:%S", time.localtime(track["start"])
# )
# friendly_track_end: str = time.strftime(
# "%Y-%m-%d %H:%M:%S", time.localtime(track["end"])
# )
2025-02-11 20:01:07 -05:00
hook_data: dict = {
"username": "serious.FM",
"embeds": [
{
"title": f"Now Playing on {station.title()}",
"description": f"## {track['song']}\nby\n## {track['artist']}",
"color": 0x30C56F,
2025-02-11 15:21:01 -05:00
"thumbnail": {
2025-02-18 06:55:47 -05:00
"url": f"https://api.codey.lol/radio/album_art?track_id={track['id']}&{int(time.time())}",
2025-02-11 15:21:01 -05:00
},
2025-02-10 20:29:57 -05:00
"fields": [
{
"name": "Duration",
"value": self.duration_conv(track["duration"]),
2025-02-10 20:29:57 -05:00
"inline": True,
},
{
2025-02-11 16:44:53 -05:00
"name": "Genre",
"value": (
track["genre"] if track["genre"] else "Unknown"
),
2025-02-11 16:44:53 -05:00
"inline": True,
},
{
2025-02-10 20:29:57 -05:00
"name": "Filetype",
"value": track["file_path"].rsplit(".", maxsplit=1)[1],
2025-02-10 20:29:57 -05:00
"inline": True,
},
{
"name": "Higher Res",
"value": "[stream/icecast](https://stream.codey.lol/sfm.ogg) | [web player](https://codey.lol/radio)",
2025-02-10 20:29:57 -05:00
"inline": True,
},
{
"name": "Album",
"value": (
track["album"]
if "album" in track.keys()
else "Unknown"
),
},
],
}
],
}
now: float = time.time()
_sfm: dict = self.webhooks["sfm"]
if _sfm:
sfm_hook: str = _sfm.get("hook", "")
sfm_hook_lastRun: Optional[float] = _sfm.get("lastRun", 0.0)
if sfm_hook_lastRun and ((now - sfm_hook_lastRun) < 5):
logging.info("SFM Webhook: Throttled!")
return
async with ClientSession() as session:
async with await session.post(
sfm_hook,
json=hook_data,
timeout=ClientTimeout(connect=5, sock_read=5),
headers={
"content-type": "application/json; charset=utf-8",
},
) as request:
request.raise_for_status()
# Next, AI feedback (for main stream only)
"""
TEMP. DISABLED
"""
# if station == "main":
# ai_response: Optional[str] = await self.get_ai_song_info(
# track["artist"], track["song"]
# )
# if not ai_response:
# return
# hook_data = {
# "username": "GPT",
# "embeds": [
# {
# "title": "AI Feedback",
# "color": 0x35D0FF,
# "description": ai_response.strip(),
# }
# ],
# }
# ai_hook: str = self.webhooks["gpt"].get("hook")
# async with ClientSession() as session:
# async with await session.post(
# ai_hook,
# json=hook_data,
# timeout=ClientTimeout(connect=5, sock_read=5),
# headers={
# "content-type": "application/json; charset=utf-8",
# },
# ) as request:
# request.raise_for_status()
2025-02-10 20:29:57 -05:00
except Exception as e:
logging.info("Webhook error occurred: %s", str(e))
2025-02-10 20:29:57 -05:00
traceback.print_exc()