swap threading for multiprocessing's ThreadPool (radio playlist load), aiosqlite -> sqlite3 standard lib as disk i/o is blocking regardless; changes related to #32 for radio queue pagination, more work needed

This commit is contained in:
codey 2025-04-26 12:01:45 -04:00
parent 6502199b5d
commit 4c5d2b6943
3 changed files with 157 additions and 127 deletions

View File

@ -281,6 +281,17 @@ class ValidRadioReshuffleRequest(ValidRadioNextRequest):
"""
class ValidRadioQueueRequest(BaseModel):
"""
- **draw**: DataTables draw count, default 1
- **start**: paging start position, default 0
- **search**: Optional search query
"""
draw: Optional[int] = 1
start: Optional[int] = 0
search: Optional[str] = None
class ValidRadioQueueShiftRequest(BaseModel):
"""
- **key**: API Key
@ -300,4 +311,4 @@ class ValidRadioQueueRemovalRequest(BaseModel):
"""
key: str
uuid: str
uuid: str

View File

@ -2,8 +2,7 @@ import logging
import traceback
import time
import random
import asyncio
import threading
from multiprocessing.pool import ThreadPool as Pool
from .constructors import (
ValidRadioNextRequest,
ValidRadioReshuffleRequest,
@ -11,10 +10,9 @@ from .constructors import (
ValidRadioQueueRemovalRequest,
ValidRadioSongRequest,
ValidRadioTypeaheadRequest,
ValidRadioQueueRequest,
)
from utils import radio_util
from uuid import uuid4 as uuid
from typing import Optional
from fastapi import FastAPI, BackgroundTasks, Request, Response, HTTPException
from fastapi.responses import RedirectResponse, JSONResponse
@ -59,12 +57,10 @@ class Radio(FastAPI):
async def on_start(self) -> None:
logging.info("radio: Initializing")
thread = threading.Thread(
target=asyncio.run, args=(self.radio_util.load_playlist(),)
)
thread.start()
# await self.radio_util.load_playlist()
await self.radio_util._ls_skip()
with Pool() as pool:
res = pool.apply_async(self.radio_util.load_playlist)
if res:
await self.radio_util._ls_skip()
async def radio_skip(
self, data: ValidRadioNextRequest, request: Request
@ -101,6 +97,8 @@ class Radio(FastAPI):
},
)
except Exception as e:
logging.debug("radio_skip Exception: %s",
str(e))
traceback.print_exc()
return JSONResponse(
status_code=500,
@ -124,18 +122,23 @@ class Radio(FastAPI):
return JSONResponse(content={"ok": True})
async def radio_get_queue(
self, request: Request, limit: Optional[int] = 15_000
self, request: Request, data: ValidRadioQueueRequest,
) -> JSONResponse:
"""
Get current play queue, up to limit [default: 15k]
- **limit**: Number of queue items to return, default 15k
Get current play queue (paged, 20 results per page)
"""
queue: list = self.radio_util.active_playlist[0:limit]
start: int = int(data.start)
end: int = start+20
logging.info("queue request with start pos: %s & end pos: %s",
start, end)
queue_full: list = self.radio_util.active_playlist
queue: list = queue_full[start:end]
logging.info("queue length: %s", len(queue))
queue_out: list[dict] = []
for x, item in enumerate(queue):
queue_out.append(
{
"pos": x,
"pos": queue_full.index(item),
"id": item.get("id"),
"uuid": item.get("uuid"),
"artist": item.get("artist"),
@ -146,7 +149,13 @@ class Radio(FastAPI):
"duration": item.get("duration"),
}
)
return JSONResponse(content={"items": queue_out})
out_json = {
"draw": data.draw,
"recordsTotal": len(queue_full),
"recordsFiltered": len(queue_full) if not data.search else len(queue_full), # todo: implement search
"items": queue_out,
}
return JSONResponse(content=out_json)
async def radio_queue_shift(
self, data: ValidRadioQueueShiftRequest, request: Request
@ -230,6 +239,8 @@ class Radio(FastAPI):
)
return Response(content=album_art, media_type="image/png")
except Exception as e:
logging.debug("album_art_handler Exception: %s",
str(e))
traceback.print_exc()
return RedirectResponse(
url="https://codey.lol/images/radio_art_default.jpg", status_code=302
@ -256,7 +267,7 @@ class Radio(FastAPI):
) -> JSONResponse:
"""
Get next track
Track will be removed from the queue in the process.
(Track will be removed from the queue in the process.)
- **key**: API key
- **skipTo**: Optional UUID to skip to
"""

View File

@ -8,7 +8,7 @@ from typing import Union, Optional, Iterable
from aiohttp import ClientSession, ClientTimeout
import regex
from regex import Pattern
import aiosqlite as sqlite3
import sqlite3
import gpt
import music_tag # type: ignore
from endpoints.constructors import RadioException
@ -49,12 +49,12 @@ class RadioUtil:
"/usr/local/share", "sqlite_dbs", "track_album_art.db"
)
self.playback_genres: list[str] = [
"post-hardcore",
"post hardcore",
"metalcore",
"deathcore",
"edm",
"electronic",
# "post-hardcore",
# "post hardcore",
# "metalcore",
# "deathcore",
# "edm",
# "electronic",
]
self.active_playlist: list[dict] = []
self.playlist_loaded: bool = False
@ -99,16 +99,16 @@ class RadioUtil:
"""
if not query:
return None
async with sqlite3.connect(self.active_playlist_path, timeout=1) as _db:
with sqlite3.connect(self.active_playlist_path, timeout=1) as _db:
_db.row_factory = sqlite3.Row
db_query: str = """SELECT DISTINCT(LOWER(TRIM(artist) || " - " || TRIM(song))),\
(TRIM(artist) || " - " || TRIM(song)) as artistsong FROM tracks WHERE\
artistsong LIKE ? LIMIT 30"""
db_params: tuple[str] = (f"%{query}%",)
async with _db.execute(db_query, db_params) as _cursor:
result: Iterable[sqlite3.Row] = await _cursor.fetchall()
out_result = [str(r["artistsong"]) for r in result]
return out_result
_cursor = _db.execute(db_query, db_params)
result: Iterable[sqlite3.Row] = _cursor.fetchall()
out_result = [str(r["artistsong"]) for r in result]
return out_result
async def search_playlist(
self,
@ -148,31 +148,31 @@ class RadioUtil:
search_song.lower(),
artistsong.lower(),
)
async with sqlite3.connect(self.active_playlist_path, timeout=2) as db_conn:
await db_conn.enable_load_extension(True)
with sqlite3.connect(self.active_playlist_path, timeout=2) as db_conn:
db_conn.enable_load_extension(True)
for ext in self.sqlite_exts:
await db_conn.load_extension(ext)
db_conn.load_extension(ext)
db_conn.row_factory = sqlite3.Row
async with await db_conn.execute(
db_cursor = db_conn.execute(
search_query, search_params
) as db_cursor:
result: Optional[sqlite3.Row | bool] = await db_cursor.fetchone()
if not result or not isinstance(result, sqlite3.Row):
return False
push_obj: dict = {
"id": result["id"],
"uuid": str(uuid().hex),
"artist": double_space.sub(" ", result["artist"].strip()),
"song": double_space.sub(" ", result["song"].strip()),
"artistsong": result["artistsong"].strip(),
"genre": await self.get_genre(
double_space.sub(" ", result["artist"].strip())
),
"file_path": result["file_path"],
"duration": result["duration"],
}
self.active_playlist.insert(0, push_obj)
return True
)
result: Optional[sqlite3.Row | bool] = db_cursor.fetchone()
if not result or not isinstance(result, sqlite3.Row):
return False
push_obj: dict = {
"id": result["id"],
"uuid": str(uuid().hex),
"artist": double_space.sub(" ", result["artist"].strip()),
"song": double_space.sub(" ", result["song"].strip()),
"artistsong": result["artistsong"].strip(),
"genre": self.get_genre(
double_space.sub(" ", result["artist"].strip())
),
"file_path": result["file_path"],
"duration": result["duration"],
}
self.active_playlist.insert(0, push_obj)
return True
except Exception as e:
logging.critical("search_playlist:: Search error occurred: %s", str(e))
traceback.print_exc()
@ -188,19 +188,19 @@ class RadioUtil:
bool
"""
try:
async with sqlite3.connect(self.artist_genre_db_path, timeout=2) as _db:
with sqlite3.connect(self.artist_genre_db_path, timeout=2) as _db:
query: str = (
"INSERT OR IGNORE INTO artist_genre (artist, genre) VALUES(?, ?)"
)
params: tuple[str, str] = (artist, genre)
res = await _db.execute_insert(query, params)
if res:
res = _db.execute(query, params)
if isinstance(res.lastrowid, int):
logging.debug(
"Query executed successfully for %s/%s, committing",
artist,
genre,
)
await _db.commit()
_db.commit()
return True
logging.debug(
"Failed to store artist/genre pair: %s/%s (res: %s)", artist, genre, res
@ -224,7 +224,7 @@ class RadioUtil:
"""
try:
added_rows: int = 0
async with sqlite3.connect(self.artist_genre_db_path, timeout=2) as _db:
with sqlite3.connect(self.artist_genre_db_path, timeout=2) as _db:
for pair in pairs:
try:
artist, genre = pair
@ -232,8 +232,8 @@ class RadioUtil:
"INSERT OR IGNORE INTO artist_genre (artist, genre) VALUES(?, ?)"
)
params: tuple[str, str] = (artist, genre)
res = await _db.execute_insert(query, params)
if res:
res = _db.execute(query, params)
if isinstance(res.lastrowid, int):
logging.debug(
"add_genres: Query executed successfully for %s/%s",
artist,
@ -257,7 +257,7 @@ class RadioUtil:
continue
if added_rows:
logging.info("add_genres: Committing %s rows", added_rows)
await _db.commit()
_db.commit()
return True
logging.info("add_genres: Failed (No rows added)")
return False
@ -266,7 +266,7 @@ class RadioUtil:
traceback.print_exc()
return False
async def get_genre(self, artist: str) -> str:
def get_genre(self, artist: str) -> str:
"""
Retrieve Genre for given Artist
Args:
@ -280,22 +280,22 @@ class RadioUtil:
"SELECT genre FROM artist_genre WHERE artist LIKE ? COLLATE NOCASE"
)
params: tuple[str] = (f"%%{artist}%%",)
async with sqlite3.connect(self.artist_genre_db_path, timeout=2) as _db:
with sqlite3.connect(self.artist_genre_db_path, timeout=2) as _db:
_db.row_factory = sqlite3.Row
async with await _db.execute(query, params) as _cursor:
res = await _cursor.fetchone()
if not res:
return "Not Found" # Exception suppressed
# raise RadioException(
# f"Could not locate {artist} in artist_genre_map db."
# )
return res["genre"]
_cursor = _db.execute(query, params)
res = _cursor.fetchone()
if not res:
return "Not Found" # Exception suppressed
# raise RadioException(
# f"Could not locate {artist} in artist_genre_map db."
# )
return res["genre"]
except Exception as e:
logging.info("Failed to look up genre for artist: %s (%s)", artist, str(e))
traceback.print_exc()
return "Not Found"
async def load_playlist(self) -> None:
def load_playlist(self) -> None:
"""Load Playlist"""
try:
logging.info("Loading playlist...")
@ -332,54 +332,54 @@ class RadioUtil:
# db_query = 'SELECT distinct(artist || " - " || song) AS artistdashsong, id, artist, song, album, genre, file_path, duration FROM tracks\
# WHERE (artist LIKE "%akira the don%") AND (NOT song LIKE "%%stripped%%" AND NOT song LIKE "%(2022)%" AND NOT song LIKE "%(live%%" AND NOT song LIKE "%%acoustic%%" AND NOT song LIKE "%%instrumental%%" AND NOT song LIKE "%%remix%%" AND NOT song LIKE "%%reimagined%%" AND NOT song LIKE "%%alternative%%" AND NOT song LIKE "%%unzipped%%") GROUP BY artistdashsong ORDER BY RANDOM()'# ORDER BY album ASC, id ASC'
async with sqlite3.connect(
with sqlite3.connect(
f"file:{self.active_playlist_path}?mode=ro", uri=True, timeout=15
) as db_conn:
db_conn.row_factory = sqlite3.Row
async with await db_conn.execute(db_query) as db_cursor:
results: list[sqlite3.Row] = await db_cursor.fetchall()
self.active_playlist = [
{
"uuid": str(uuid().hex),
"id": r["id"],
"artist": double_space.sub(" ", r["artist"]).strip(),
"song": double_space.sub(" ", r["song"]).strip(),
"album": double_space.sub(" ", r["album"]).strip(),
"genre": await self.get_genre(
double_space.sub(" ", r["artist"]).strip()
),
"artistsong": double_space.sub(
" ", r["artistdashsong"]
).strip(),
"file_path": r["file_path"],
"duration": r["duration"],
}
for r in results
]
db_cursor = db_conn.execute(db_query)
results: list[sqlite3.Row] = db_cursor.fetchall()
self.active_playlist = [
{
"uuid": str(uuid().hex),
"id": r["id"],
"artist": double_space.sub(" ", r["artist"]).strip(),
"song": double_space.sub(" ", r["song"]).strip(),
"album": double_space.sub(" ", r["album"]).strip(),
"genre": self.get_genre(
double_space.sub(" ", r["artist"]).strip()
),
"artistsong": double_space.sub(
" ", r["artistdashsong"]
).strip(),
"file_path": r["file_path"],
"duration": r["duration"],
}
for r in results
]
logging.info(
"Populated active playlists with %s items",
len(self.active_playlist),
)
if self.playback_genres:
new_playlist: list[dict] = []
logging.info("Limiting playback genres")
for item in self.active_playlist:
matched_genre: bool = False
item_genres: str = item.get("genre", "").strip().lower()
for genre in self.playback_genres:
genre = genre.strip().lower()
if genre in item_genres:
new_playlist.append(item)
matched_genre = True
continue
if matched_genre:
continue
self.active_playlist = new_playlist
logging.info(
"Populated active playlists with %s items",
"%s items remain for playback after filtering",
len(self.active_playlist),
)
if self.playback_genres:
new_playlist: list[dict] = []
logging.info("Limiting playback genres")
for item in self.active_playlist:
matched_genre: bool = False
item_genres: str = item.get("genre", "").strip().lower()
for genre in self.playback_genres:
genre = genre.strip().lower()
if genre in item_genres:
new_playlist.append(item)
matched_genre = True
continue
if matched_genre:
continue
self.active_playlist = new_playlist
logging.info(
"%s items remain for playback after filtering",
len(self.active_playlist),
)
self.playlist_loaded = True
self.playlist_loaded = True
except Exception as e:
logging.info("Playlist load failed: %s", str(e))
traceback.print_exc()
@ -400,16 +400,22 @@ class RadioUtil:
)
tagger = music_tag.load_file(file_path)
album_art = tagger["artwork"].first.data
async with sqlite3.connect(self.album_art_db_path, timeout=2) as db_conn:
async with await db_conn.execute(
with sqlite3.connect(self.album_art_db_path, timeout=2) as db_conn:
db_cursor = db_conn.execute(
"INSERT OR IGNORE INTO album_art (track_id, album_art) VALUES(?, ?)",
(
track_id,
album_art,
),
) as db_cursor:
await db_conn.commit()
except:
)
if isinstance(db_cursor.lastrowid, int):
db_conn.commit()
else:
logging.debug("No row inserted for track_id: %s w/ file_path: %s", track_id,
file_path)
except Exception as e:
logging.debug("cache_album_art Exception: %s",
str(e))
traceback.print_exc()
async def get_album_art(self, track_id: int) -> Optional[bytes]:
@ -421,19 +427,21 @@ class RadioUtil:
Optional[bytes]
"""
try:
async with sqlite3.connect(self.album_art_db_path, timeout=2) as db_conn:
with sqlite3.connect(self.album_art_db_path, timeout=2) as db_conn:
db_conn.row_factory = sqlite3.Row
query: str = "SELECT album_art FROM album_art WHERE track_id = ?"
query_params: tuple[int] = (track_id,)
async with await db_conn.execute(query, query_params) as db_cursor:
result: Optional[Union[sqlite3.Row, bool]] = (
await db_cursor.fetchone()
)
if not result or not isinstance(result, sqlite3.Row):
return None
return result["album_art"]
except:
db_cursor = db_conn.execute(query, query_params)
result: Optional[Union[sqlite3.Row, bool]] = (
db_cursor.fetchone()
)
if not result or not isinstance(result, sqlite3.Row):
return None
return result["album_art"]
except Exception as e:
logging.debug("get_album_art Exception: %s",
str(e))
traceback.print_exc()
return None