import logging import traceback import time import datetime import os import random from uuid import uuid4 as uuid from typing import Union, Optional, Iterable from aiohttp import ClientSession, ClientTimeout import regex from regex import Pattern import sqlite3 import gpt import music_tag # type: ignore from rapidfuzz import fuzz from endpoints.constructors import RadioException double_space: Pattern = regex.compile(r"\s{2,}") non_alnum: Pattern = regex.compile(r"[^a-zA-Z0-9]") """ TODO: - get_genre should only be called once for load_playlist, rework get_genre to (optionally) accept a list of artists, and return (optionally) a list instead of an str - Ask GPT when we encounter an untagged (no genre defined) artist, automation is needed for this tedious task - etc.. """ class RadioUtil: """ Radio Utils """ def __init__(self, constants, loop) -> None: self.constants = constants self.loop = loop self.gpt = gpt.GPT(self.constants) self.ls_uri: str = self.constants.LS_URI self.sqlite_exts: list[str] = [ "/home/api/api/solibs/spellfix1.cpython-311-x86_64-linux-gnu.so" ] self.active_playlist_path: str = os.path.join( "/usr/local/share", "sqlite_dbs", "track_file_map.db" ) self.artist_genre_db_path: str = os.path.join( "/usr/local/share", "sqlite_dbs", "artist_genre_map.db" ) self.album_art_db_path: str = os.path.join( "/usr/local/share", "sqlite_dbs", "track_album_art.db" ) self.playback_genres: list[str] = [ "post-hardcore", "post hardcore", "metalcore", "deathcore", "edm", "electronic", ] self.active_playlist: list[dict] = [] self.playlist_loaded: bool = False self.now_playing: dict = { "artist": "N/A", "song": "N/A", "album": "N/A", "genre": "N/A", "artistsong": "N/A - N/A", "duration": 0, "start": 0, "end": 0, "file_path": None, "id": None, } self.webhooks: dict = { "gpt": { "hook": self.constants.GPT_WEBHOOK, }, "sfm": { "hook": self.constants.SFM_WEBHOOK, }, } def duration_conv(self, s: Union[int, float]) -> str: """ Convert duration given in seconds to hours, minutes, and seconds (h:m:s) Args: s (Union[int, float]): seconds to convert Returns: str """ return str(datetime.timedelta(seconds=s)).split(".", maxsplit=1)[0] def trackdb_typeahead(self, query: str) -> Optional[list[str]]: """ Query track db for typeahead Args: query (str): The search query Returns: Optional[list[str]] """ if not query: return None with sqlite3.connect(self.active_playlist_path, timeout=1) as _db: _db.row_factory = sqlite3.Row db_query: str = """SELECT DISTINCT(LOWER(TRIM(artist) || " - " || TRIM(song))),\ (TRIM(artist) || " - " || TRIM(song)) as artistsong FROM tracks WHERE\ artistsong LIKE ? LIMIT 30""" db_params: tuple[str] = (f"%{query}%",) _cursor = _db.execute(db_query, db_params) result: Iterable[sqlite3.Row] = _cursor.fetchall() out_result = [str(r["artistsong"]) for r in result] return out_result def datatables_search(self, filter: str) -> Optional[list[dict]]: """DataTables Search Args: filter (str): The filter query to fuzzy match with Returns: list[str]: List of matching playlist items (if any are found) """ filter = filter.strip().lower() matched: list[dict] = [] for item in self.active_playlist: artist: str = item.get("artist", None) song: str = item.get("song", None) artistsong: str = item.get("artistsong", None) album: str = item.get("album", None) if not artist or not song or not artistsong: continue if non_alnum.sub("", filter) in non_alnum.sub("", artistsong).lower(): matched.append(item) continue if ( fuzz.ratio(filter, artist) >= 85 or fuzz.ratio(filter, song) >= 85 or fuzz.ratio(filter, album) >= 85 ): matched.append(item) return matched def search_playlist( self, artistsong: Optional[str] = None, artist: Optional[str] = None, song: Optional[str] = None, ) -> bool: """ Search for track, add it up next in play queue if found Args: artistsong (Optional[str]): Artist - Song combo to search [ignored if artist/song are specified] artist (Optional[str]): Artist to search (ignored if artistsong is specified) song (Optional[str]): Song to search (ignored if artistsong is specified) Returns: bool """ if artistsong and (artist or song): raise RadioException("Cannot search using combination provided") if not artistsong and (not artist or not song): raise RadioException("No query provided") try: search_query: str = ( 'SELECT id, artist, song, (artist || " - " || song) AS artistsong, album, file_path, duration FROM tracks\ WHERE editdist3((lower(artist) || " " || lower(song)), (? || " " || ?))\ <= 410 ORDER BY editdist3((lower(artist) || " " || lower(song)), ?) ASC LIMIT 1' ) if artistsong: artistsong_split: list = artistsong.split(" - ", maxsplit=1) (search_artist, search_song) = tuple(artistsong_split) else: search_artist = artist search_song = song if not artistsong: artistsong = f"{search_artist} - {search_song}" search_params = ( search_artist.lower(), search_song.lower(), artistsong.lower(), ) with sqlite3.connect(self.active_playlist_path, timeout=2) as db_conn: db_conn.enable_load_extension(True) for ext in self.sqlite_exts: db_conn.load_extension(ext) db_conn.row_factory = sqlite3.Row db_cursor = db_conn.execute(search_query, search_params) result: Optional[sqlite3.Row | bool] = db_cursor.fetchone() if not result or not isinstance(result, sqlite3.Row): return False push_obj: dict = { "id": result["id"], "uuid": str(uuid().hex), "artist": double_space.sub(" ", result["artist"].strip()), "song": double_space.sub(" ", result["song"].strip()), "artistsong": result["artistsong"].strip(), "genre": self.get_genre( double_space.sub(" ", result["artist"].strip()) ), "file_path": result["file_path"], "duration": result["duration"], } self.active_playlist.insert(0, push_obj) return True except Exception as e: logging.critical("search_playlist:: Search error occurred: %s", str(e)) traceback.print_exc() return False def add_genre(self, artist: str, genre: str) -> bool: """ Add artist/genre pairing to DB Args: artist (str) genre (str) Returns: bool """ try: with sqlite3.connect(self.artist_genre_db_path, timeout=2) as _db: query: str = ( "INSERT OR IGNORE INTO artist_genre (artist, genre) VALUES(?, ?)" ) params: tuple[str, str] = (artist, genre) res = _db.execute(query, params) if isinstance(res.lastrowid, int): logging.debug( "Query executed successfully for %s/%s, committing", artist, genre, ) _db.commit() return True logging.debug( "Failed to store artist/genre pair: %s/%s (res: %s)", artist, genre, res ) return False except Exception as e: logging.info( "Failed to store artist/genre pair: %s/%s (%s)", artist, genre, str(e) ) traceback.print_exc() return False def add_genres(self, pairs: list[dict[str, str]]) -> bool: """ (BATCH) Add artist/genre pairings to DB Expects list of dicts comprised of artist name (key), genre (value) Args: pairs (list[dict[str, str]]): Pairs of artist/genres to add, list of dicts Returns: bool """ try: added_rows: int = 0 with sqlite3.connect(self.artist_genre_db_path, timeout=2) as _db: for pair in pairs: try: artist, genre = pair query: str = ( "INSERT OR IGNORE INTO artist_genre (artist, genre) VALUES(?, ?)" ) params: tuple[str, str] = (artist, genre) res = _db.execute(query, params) if isinstance(res.lastrowid, int): logging.debug( "add_genres: Query executed successfully for %s/%s", artist, genre, ) added_rows += 1 else: logging.debug( "Failed to store artist/genre pair: %s/%s (res: %s)", artist, genre, res, ) except Exception as e: logging.info( "Failed to store artist/genre pair: %s/%s (%s)", artist, genre, str(e), ) continue if added_rows: logging.info("add_genres: Committing %s rows", added_rows) _db.commit() return True logging.info("add_genres: Failed (No rows added)") return False except Exception as e: logging.info("Failed to store artist/genre pairs: %s", str(e)) traceback.print_exc() return False def get_genre(self, artist: str) -> str: """ Retrieve Genre for given Artist Args: artist (str): The artist to query Returns: str """ try: artist = artist.strip() query: str = ( "SELECT genre FROM artist_genre WHERE artist LIKE ? COLLATE NOCASE" ) params: tuple[str] = (f"%%{artist}%%",) with sqlite3.connect(self.artist_genre_db_path, timeout=2) as _db: _db.row_factory = sqlite3.Row _cursor = _db.execute(query, params) res = _cursor.fetchone() if not res: return "Not Found" # Exception suppressed # raise RadioException( # f"Could not locate {artist} in artist_genre_map db." # ) return res["genre"] except Exception as e: logging.info("Failed to look up genre for artist: %s (%s)", artist, str(e)) traceback.print_exc() return "Not Found" def load_playlist(self) -> None: """Load Playlist""" try: logging.info("Loading playlist...") self.active_playlist.clear() # db_query = 'SELECT distinct(artist || " - " || song) AS artistdashsong, id, artist, song, album, genre, file_path, duration FROM tracks\ # GROUP BY artistdashsong ORDER BY RANDOM()' """ LIMITED GENRES """ db_query: str = ( 'SELECT distinct(LOWER(TRIM(artist)) || " - " || LOWER(TRIM(song))), (TRIM(artist) || " - " || TRIM(song))' "AS artistdashsong, id, artist, song, album, file_path, duration FROM tracks" ) """ LIMITED TO ONE/SMALL SUBSET OF GENRES """ # db_query = 'SELECT distinct(artist || " - " || song) AS artistdashsong, id, artist, song, album, genre, file_path, duration FROM tracks\ # WHERE (artist LIKE "%winds of plague%" OR artist LIKE "%acacia st%" OR artist LIKE "%suicide si%" OR artist LIKE "%in dying%") AND (NOT song LIKE "%(live%") ORDER BY RANDOM()' #ORDER BY artist DESC, album ASC, song ASC' """ LIMITED TO ONE/SOME ARTISTS... """ # db_query = 'SELECT distinct(artist || " - " || song) AS artistdashsong, id, artist, song, album, file_path, duration FROM tracks\ # WHERE (artist LIKE "%outline in color%") AND (NOT song LIKE "%%stripped%%" AND NOT song LIKE "%(2022)%" AND NOT song LIKE "%(live%%" AND NOT song LIKE "%%acoustic%%" AND NOT song LIKE "%%instrumental%%" AND NOT song LIKE "%%remix%%" AND NOT song LIKE "%%reimagined%%" AND NOT song LIKE "%%alternative%%" AND NOT song LIKE "%%unzipped%%") GROUP BY artistdashsong ORDER BY RANDOM()' # ORDER BY album ASC, id ASC' # db_query = 'SELECT distinct(artist || " - " || song) AS artistdashsong, id, artist, song, album, genre, file_path, duration FROM tracks\ # WHERE (artist LIKE "%sullivan king%" OR artist LIKE "%kayzo%" OR artist LIKE "%adventure club%") AND (NOT song LIKE "%%stripped%%" AND NOT song LIKE "%(2022)%" AND NOT song LIKE "%(live%%" AND NOT song LIKE "%%acoustic%%" AND NOT song LIKE "%%instrumental%%" AND NOT song LIKE "%%remix%%" AND NOT song LIKE "%%reimagined%%" AND NOT song LIKE "%%alternative%%" AND NOT song LIKE "%%unzipped%%") GROUP BY artistdashsong ORDER BY RANDOM()'# ORDER BY album ASC, id ASC' # db_query = 'SELECT distinct(artist || " - " || song) AS artistdashsong, id, artist, song, album, genre, file_path, duration FROM tracks\ # WHERE (artist LIKE "%akira the don%") AND (NOT song LIKE "%%stripped%%" AND NOT song LIKE "%(2022)%" AND NOT song LIKE "%(live%%" AND NOT song LIKE "%%acoustic%%" AND NOT song LIKE "%%instrumental%%" AND NOT song LIKE "%%remix%%" AND NOT song LIKE "%%reimagined%%" AND NOT song LIKE "%%alternative%%" AND NOT song LIKE "%%unzipped%%") GROUP BY artistdashsong ORDER BY RANDOM()'# ORDER BY album ASC, id ASC' with sqlite3.connect( f"file:{self.active_playlist_path}?mode=ro", uri=True, timeout=15 ) as db_conn: db_conn.row_factory = sqlite3.Row db_cursor = db_conn.execute(db_query) results: list[sqlite3.Row] = db_cursor.fetchall() self.active_playlist = [ { "uuid": str(uuid().hex), "id": r["id"], "artist": double_space.sub(" ", r["artist"]).strip(), "song": double_space.sub(" ", r["song"]).strip(), "album": double_space.sub(" ", r["album"]).strip(), "genre": self.get_genre( double_space.sub(" ", r["artist"]).strip() ), "artistsong": double_space.sub( " ", r["artistdashsong"] ).strip(), "file_path": r["file_path"], "duration": r["duration"], } for r in results if r not in self.active_playlist ] logging.info( "Populated active playlists with %s items", len(self.active_playlist), ) random.shuffle(self.active_playlist) """Dedupe""" logging.info("Removing duplicate tracks...") dedupe_processed = [] for item in self.active_playlist: artistsongabc: str = non_alnum.sub('', item.get('artistsong', None)) if not artistsongabc: logging.info("Missing artistsong: %s", item) continue if artistsongabc in dedupe_processed: self.active_playlist.remove(item) dedupe_processed.append(artistsongabc) logging.info( "Duplicates removed." "New playlist size: %s", len(self.active_playlist)) logging.info("Playlist: %s", [str(a.get('artistsong', '')) for a in self.active_playlist]) if self.playback_genres: new_playlist: list[dict] = [] logging.info("Limiting playback genres") for item in self.active_playlist: matched_genre: bool = False item_genres: str = item.get("genre", "").strip().lower() for genre in self.playback_genres: genre = genre.strip().lower() if genre in item_genres: if item in new_playlist: continue new_playlist.append(item) matched_genre = True continue if matched_genre: continue self.active_playlist = new_playlist logging.info( "%s items remain for playback after filtering", len(self.active_playlist), ) self.playlist_loaded = True self.loop.run_until_complete(self._ls_skip()) except Exception as e: logging.info("Playlist load failed: %s", str(e)) traceback.print_exc() def cache_album_art(self, track_id: int, file_path: str) -> None: """ Cache Album Art to SQLite DB Args: track_id (int): Track ID to update file_path (str): Path to file, for artwork extraction Returns: None """ try: logging.info( "cache_album_art: Attempting to store album art for track_id: %s", track_id, ) tagger = music_tag.load_file(file_path) album_art = tagger["artwork"].first.data with sqlite3.connect(self.album_art_db_path, timeout=2) as db_conn: db_cursor = db_conn.execute( "INSERT OR IGNORE INTO album_art (track_id, album_art) VALUES(?, ?)", ( track_id, album_art, ), ) if isinstance(db_cursor.lastrowid, int): db_conn.commit() else: logging.debug( "No row inserted for track_id: %s w/ file_path: %s", track_id, file_path, ) except Exception as e: logging.debug("cache_album_art Exception: %s", str(e)) traceback.print_exc() def get_album_art(self, track_id: int) -> Optional[bytes]: """ Get Album Art Args: track_id (int): Track ID to query Returns: Optional[bytes] """ try: with sqlite3.connect(self.album_art_db_path, timeout=2) as db_conn: db_conn.row_factory = sqlite3.Row query: str = "SELECT album_art FROM album_art WHERE track_id = ?" query_params: tuple[int] = (track_id,) db_cursor = db_conn.execute(query, query_params) result: Optional[Union[sqlite3.Row, bool]] = db_cursor.fetchone() if not result or not isinstance(result, sqlite3.Row): return None return result["album_art"] except Exception as e: logging.debug("get_album_art Exception: %s", str(e)) traceback.print_exc() return None def get_queue_item_by_uuid(self, _uuid: str) -> Optional[tuple[int, dict]]: """ Get queue item by UUID Args: uuid: The UUID to search Returns: Optional[tuple[int, dict]] """ for x, item in enumerate(self.active_playlist): if item.get("uuid") == _uuid: return (x, item) return None async def _ls_skip(self) -> bool: """ Ask LiquidSoap server to skip to the next track Args: None Returns: bool """ try: async with ClientSession() as session: async with session.get( f"{self.ls_uri}/next", timeout=ClientTimeout(connect=2, sock_read=2) ) as request: request.raise_for_status() text: Optional[str] = await request.text() return text == "OK" except Exception as e: logging.debug("Skip failed: %s", str(e)) return False # failsafe async def get_ai_song_info(self, artist: str, song: str) -> Optional[str]: """ Get AI Song Info Args: artist (str) song (str) Returns: Optional[str] """ prompt: str = f" am going to listen to {song} by {artist}." response: Optional[str] = await self.gpt.get_completion(prompt) if not response: logging.critical("No response received from GPT?") return None return response async def webhook_song_change(self, track: dict) -> None: """ Handles Song Change Outbounds (Webhooks) Args: track (dict) Returns: None """ try: # First, send track info """ TODO: Review friendly_track_start and friendly_track_end, not currently in use """ # friendly_track_start: str = time.strftime( # "%Y-%m-%d %H:%M:%S", time.localtime(track["start"]) # ) # friendly_track_end: str = time.strftime( # "%Y-%m-%d %H:%M:%S", time.localtime(track["end"]) # ) hook_data: dict = { "username": "serious.FM", "embeds": [ { "title": "Now Playing", "description": f"## {track['song']}\nby\n## {track['artist']}", "color": 0x30C56F, "thumbnail": { "url": f"https://api.codey.lol/radio/album_art?track_id={track['id']}&{int(time.time())}", }, "fields": [ { "name": "Duration", "value": self.duration_conv(track["duration"]), "inline": True, }, { "name": "Genre", "value": ( track["genre"] if track["genre"] else "Unknown" ), "inline": True, }, { "name": "Filetype", "value": track["file_path"].rsplit(".", maxsplit=1)[1], "inline": True, }, { "name": "Higher Res", "value": "[stream/icecast](https://stream.codey.lol/sfm.ogg) | [web player](https://codey.lol/radio)", "inline": True, }, { "name": "Album", "value": ( track["album"] if track["album"] else "Unknown" ), }, ], } ], } sfm_hook: str = self.webhooks["sfm"].get("hook") async with ClientSession() as session: async with await session.post( sfm_hook, json=hook_data, timeout=ClientTimeout(connect=5, sock_read=5), headers={ "content-type": "application/json; charset=utf-8", }, ) as request: request.raise_for_status() # Next, AI feedback ai_response: Optional[str] = await self.get_ai_song_info( track["artist"], track["song"] ) if not ai_response: return hook_data = { "username": "GPT", "embeds": [ { "title": "AI Feedback", "color": 0x35D0FF, "description": ai_response.strip(), } ], } ai_hook: str = self.webhooks["gpt"].get("hook") async with ClientSession() as session: async with await session.post( ai_hook, json=hook_data, timeout=ClientTimeout(connect=5, sock_read=5), headers={ "content-type": "application/json; charset=utf-8", }, ) as request: request.raise_for_status() except Exception as e: logging.info("Webhook error occurred: %s", str(e)) traceback.print_exc()