WIP: additional radio stations

This commit is contained in:
2025-07-17 06:55:16 -04:00
parent fd300743c8
commit 85182b7d8c
5 changed files with 162 additions and 139 deletions

View File

@ -18,15 +18,6 @@ from endpoints.constructors import RadioException
double_space: Pattern = regex.compile(r"\s{2,}")
non_alnum: Pattern = regex.compile(r"[^a-zA-Z0-9]")
"""
TODO:
- get_genre should only be called once for load_playlist, rework get_genre to (optionally) accept a list of artists,
and return (optionally) a list instead of an str
- Ask GPT when we encounter an untagged (no genre defined) artist, automation is needed for this tedious task
- etc..
"""
class RadioUtil:
"""
Radio Utils
@ -40,7 +31,7 @@ class RadioUtil:
self.sqlite_exts: list[str] = [
"/home/kyle/api/solibs/spellfix1.cpython-311-x86_64-linux-gnu.so"
]
self.active_playlist_path: str = os.path.join(
self.playback_db_path: str = os.path.join(
"/usr/local/share", "sqlite_dbs", "track_file_map.db"
)
self.artist_genre_db_path: str = os.path.join(
@ -49,6 +40,14 @@ class RadioUtil:
self.album_art_db_path: str = os.path.join(
"/usr/local/share", "sqlite_dbs", "track_album_art.db"
)
self.db_queries = {
'main': self.constants.RADIO_DB_QUERY,
'rap': self.constants.RADIO_DB_QUERY_RAP,
'pop': self.constants.RADIO_DB_QUERY_POP,
'classical': self.constants.RADIO_DB_QUERY_CLASSICAL,
'rock': self.constants.RADIO_DB_QUERY_ROCK,
'electronic': self.constants.RADIO_DB_QUERY_ELECTRONIC,
}
self.playback_genres: list[str] = [
# "metal",
# # "hip hop",
@ -64,19 +63,21 @@ class RadioUtil:
# # "pop punk",
# # "pop-punk",
]
self.active_playlist: list[dict] = []
self.playlist_loaded: bool = False
self.now_playing: dict = {
"artist": "N/A",
"song": "N/A",
"album": "N/A",
"genre": "N/A",
"artistsong": "N/A - N/A",
"duration": 0,
"start": 0,
"end": 0,
"file_path": None,
"id": None,
self.active_playlist: dict[str, list[dict]] = {}
self.playlists_loaded: bool = False
self.now_playing: dict[str, dict] = {
k: {
"artist": "N/A",
"song": "N/A",
"album": "N/A",
"genre": "N/A",
"artistsong": "N/A - N/A",
"duration": 0,
"start": 0,
"end": 0,
"file_path": None,
"id": None,
} for k in self.db_queries.keys()
}
self.webhooks: dict = {
"gpt": {
@ -107,7 +108,7 @@ class RadioUtil:
"""
if not query:
return None
with sqlite3.connect(self.active_playlist_path, timeout=1) as _db:
with sqlite3.connect(self.playback_db_path, timeout=1) as _db:
_db.row_factory = sqlite3.Row
db_query: str = """SELECT DISTINCT(LOWER(TRIM(artist) || " - " || TRIM(song))),\
(TRIM(artist) || " - " || TRIM(song)) as artistsong FROM tracks WHERE\
@ -118,7 +119,9 @@ class RadioUtil:
out_result = [str(r["artistsong"]) for r in result]
return out_result
def datatables_search(self, filter: str) -> Optional[list[dict]]:
def datatables_search(self,
filter: str,
station: str = "main") -> Optional[list[dict]]:
"""DataTables Search
Args:
@ -129,7 +132,7 @@ class RadioUtil:
"""
filter = filter.strip().lower()
matched: list[dict] = []
for item in self.active_playlist:
for item in self.active_playlist[station]:
artist: str = item.get("artist", None)
song: str = item.get("song", None)
artistsong: str = item.get("artistsong", None)
@ -147,11 +150,12 @@ class RadioUtil:
matched.append(item)
return matched
def search_playlist(
def search_db(
self,
artistsong: Optional[str] = None,
artist: Optional[str] = None,
song: Optional[str] = None,
station: str = "main"
) -> bool:
"""
Search for track, add it up next in play queue if found
@ -183,7 +187,7 @@ class RadioUtil:
search_song.lower(),
artistsong.lower(),
)
with sqlite3.connect(self.active_playlist_path, timeout=2) as db_conn:
with sqlite3.connect(self.playback_db_path, timeout=2) as db_conn:
db_conn.enable_load_extension(True)
for ext in self.sqlite_exts:
db_conn.load_extension(ext)
@ -204,10 +208,10 @@ class RadioUtil:
"file_path": result["file_path"],
"duration": result["duration"],
}
self.active_playlist.insert(0, push_obj)
self.active_playlist[station].insert(0, push_obj)
return True
except Exception as e:
logging.critical("search_playlist:: Search error occurred: %s", str(e))
logging.critical("search_db:: Search error occurred: %s", str(e))
traceback.print_exc()
return False
@ -321,7 +325,7 @@ class RadioUtil:
if not res:
artist_genre[artist] = "N/A"
continue
artist_genre[artist] = res["genre"].title()
artist_genre[artist] = res["genre"]
time_end: float = time.time()
logging.info(f"Time taken: {time_end - time_start}")
return artist_genre
@ -356,94 +360,87 @@ class RadioUtil:
traceback.print_exc()
return "Not Found"
def load_playlist(self) -> None:
"""Load Playlist"""
def load_playlists(self) -> None:
"""Load Playlists"""
try:
logging.info("Loading playlist...")
self.active_playlist.clear()
db_query: str = self.constants.RADIO_DB_QUERY
logging.info("Loading playlists...")
if isinstance(self.active_playlist, dict):
self.active_playlist.clear()
with sqlite3.connect(
f"file:{self.active_playlist_path}?mode=ro", uri=True, timeout=15
f"file:{self.playback_db_path}?mode=ro", uri=True, timeout=15
) as db_conn:
db_conn.row_factory = sqlite3.Row
db_cursor = db_conn.execute(db_query)
results: list[sqlite3.Row] = db_cursor.fetchall()
self.active_playlist = [
{
"uuid": str(uuid().hex),
"id": r["id"],
"artist": double_space.sub(" ", r["artist"]).strip(),
"song": double_space.sub(" ", r["song"]).strip(),
"album": double_space.sub(" ", r["album"]).strip(),
"genre": r["genre"].title() if r["genre"] else "Not Found",
"artistsong": double_space.sub(
" ", r["artistdashsong"]
).strip(),
"file_path": r["file_path"],
"duration": r["duration"],
}
for r in results
if r not in self.active_playlist
]
logging.info(
"Populated active playlists with %s items",
len(self.active_playlist),
)
random.shuffle(self.active_playlist)
"""Dedupe"""
logging.info("Removing duplicate tracks...")
dedupe_processed = []
for item in self.active_playlist:
artistsongabc: str = non_alnum.sub("", item.get("artistsong", None))
if not artistsongabc:
logging.info("Missing artistsong: %s", item)
for station in self.db_queries:
db_query = self.db_queries.get(station)
if not db_query:
logging.critical("No query found for %s", station)
continue
if artistsongabc in dedupe_processed:
self.active_playlist.remove(item)
dedupe_processed.append(artistsongabc)
logging.info(
"Duplicates removed.New playlist size: %s",
len(self.active_playlist),
)
logging.info(
"Playlist: %s",
[str(a.get("artistsong", "")) for a in self.active_playlist],
)
if self.playback_genres:
new_playlist: list[dict] = []
logging.info("Limiting playback genres")
# for item in self.active_playlist:
# matched_genre: bool = False
# item_genres: str = item.get("genre", "").strip().lower()
# for genre in self.playback_genres:
# genre = genre.strip().lower()
# if genre in item_genres:
# if item in new_playlist:
# continue
# new_playlist.append(item)
# matched_genre = True
# continue
# if matched_genre:
# continue
for item in self.active_playlist:
item_genres = item.get("genre", "").strip().lower()
# Check if any genre matches and item isn't already in new_playlist
if any(genre.strip().lower() in item_genres for genre in self.playback_genres):
if item not in new_playlist:
new_playlist.append(item)
self.active_playlist = new_playlist
if station not in self.active_playlist:
self.active_playlist[station] = []
db_cursor = db_conn.execute(db_query)
results: list[sqlite3.Row] = db_cursor.fetchall()
self.active_playlist[station] = [
{
"uuid": str(uuid().hex),
"id": r["id"],
"artist": double_space.sub(" ", r["artist"]).strip(),
"song": double_space.sub(" ", r["song"]).strip(),
"album": double_space.sub(" ", r["album"]).strip(),
"genre": r["genre"] if r["genre"] else "Not Found",
"artistsong": double_space.sub(
" ", r["artistdashsong"]
).strip(),
"file_path": r["file_path"],
"duration": r["duration"],
}
for r in results
if r not in self.active_playlist[station]
]
logging.info(
"%s items remain for playback after filtering",
len(self.active_playlist),
"Populated playlist: %s with %s items",
station, len(self.active_playlist[station]),
)
self.playlist_loaded = True
random.shuffle(self.active_playlist[station])
"""Dedupe"""
logging.info("Removing duplicate tracks...")
dedupe_processed = []
for item in self.active_playlist[station]:
artistsongabc: str = non_alnum.sub("", item.get("artistsong", None))
if not artistsongabc:
logging.info("Missing artistsong: %s", item)
continue
if artistsongabc in dedupe_processed:
self.active_playlist[station].remove(item)
dedupe_processed.append(artistsongabc)
logging.info(
"Duplicates for playlist: %s removed. New playlist size: %s",
station, len(self.active_playlist[station]),
)
# logging.info(
# "Playlist: %s",
# [str(a.get("artistsong", "")) for a in self.active_playlist[station]],
# )
if station == 'main' and self.playback_genres:
new_playlist: list[dict] = []
logging.info("Limiting playback genres")
for item in self.active_playlist[station]:
item_genres = item.get("genre", "").strip().lower()
# Check if any genre matches and item isn't already in new_playlist
if any(genre.strip().lower() in item_genres for genre in self.playback_genres):
if item not in new_playlist:
new_playlist.append(item)
self.active_playlist[station] = new_playlist
logging.info(
"%s items for playlist: %s remain for playback after filtering",
station, len(self.active_playlist[station]),
)
self.playlists_loaded = True
self.loop.run_until_complete(self._ls_skip())
except Exception as e:
logging.info("Playlist load failed: %s", str(e))
@ -509,7 +506,9 @@ class RadioUtil:
traceback.print_exc()
return None
def get_queue_item_by_uuid(self, _uuid: str) -> Optional[tuple[int, dict]]:
def get_queue_item_by_uuid(self,
_uuid: str,
station: str = "main") -> Optional[tuple[int, dict]]:
"""
Get queue item by UUID
Args:
@ -517,7 +516,7 @@ class RadioUtil:
Returns:
Optional[tuple[int, dict]]
"""
for x, item in enumerate(self.active_playlist):
for x, item in enumerate(self.active_playlist[station]):
if item.get("uuid") == _uuid:
return (x, item)
return None