reformat / resolves #32

This commit is contained in:
codey 2025-04-26 17:17:42 -04:00
parent 4c5d2b6943
commit 6c29c6fede
9 changed files with 74 additions and 52 deletions

View File

@ -292,6 +292,7 @@ class ValidRadioQueueRequest(BaseModel):
start: Optional[int] = 0
search: Optional[str] = None
class ValidRadioQueueShiftRequest(BaseModel):
"""
- **key**: API Key
@ -311,4 +312,4 @@ class ValidRadioQueueRemovalRequest(BaseModel):
"""
key: str
uuid: str
uuid: str

View File

@ -84,9 +84,7 @@ class KarmaDB:
"INSERT INTO karma(keyword, score, last_change) VALUES(?, ?, ?)"
)
friendly_flag: str = "++" if not flag else "--"
audit_message: str = (
f"{granter} adjusted karma for {keyword} @ {datetime.datetime.now().isoformat()}: {friendly_flag}"
)
audit_message: str = f"{granter} adjusted karma for {keyword} @ {datetime.datetime.now().isoformat()}: {friendly_flag}"
audit_query: str = (
"INSERT INTO karma_audit(impacted_keyword, comment) VALUES(?, ?)"
)

View File

@ -115,7 +115,7 @@ class LyricSearch(FastAPI):
if data.src.upper() not in self.acceptable_request_sources:
await self.notifier.send(
f"ERROR @ {__file__.rsplit("/", maxsplit=1)[-1]}",
f"ERROR @ {__file__.rsplit('/', maxsplit=1)[-1]}",
f"Unknown request source: {data.src}",
)
return JSONResponse(

View File

@ -97,8 +97,7 @@ class Radio(FastAPI):
},
)
except Exception as e:
logging.debug("radio_skip Exception: %s",
str(e))
logging.debug("radio_skip Exception: %s", str(e))
traceback.print_exc()
return JSONResponse(
status_code=500,
@ -122,16 +121,22 @@ class Radio(FastAPI):
return JSONResponse(content={"ok": True})
async def radio_get_queue(
self, request: Request, data: ValidRadioQueueRequest,
self,
request: Request,
data: ValidRadioQueueRequest,
) -> JSONResponse:
"""
Get current play queue (paged, 20 results per page)
"""
start: int = int(data.start)
end: int = start+20
logging.info("queue request with start pos: %s & end pos: %s",
start, end)
queue_full: list = self.radio_util.active_playlist
end: int = start + 20
search: Optional[str] = data.search
logging.info("queue request with start pos: %s & end pos: %s", start, end)
if not search:
queue_full: list = self.radio_util.active_playlist
else:
queue_full: list = self.radio_util.datatables_search(data.search)
queue: list = queue_full[start:end]
logging.info("queue length: %s", len(queue))
queue_out: list[dict] = []
@ -152,7 +157,9 @@ class Radio(FastAPI):
out_json = {
"draw": data.draw,
"recordsTotal": len(queue_full),
"recordsFiltered": len(queue_full) if not data.search else len(queue_full), # todo: implement search
"recordsFiltered": (
len(queue_full) if not data.search else len(queue_full)
), # todo: implement search
"items": queue_out,
}
return JSONResponse(content=out_json)
@ -239,8 +246,7 @@ class Radio(FastAPI):
)
return Response(content=album_art, media_type="image/png")
except Exception as e:
logging.debug("album_art_handler Exception: %s",
str(e))
logging.debug("album_art_handler Exception: %s", str(e))
traceback.print_exc()
return RedirectResponse(
url="https://codey.lol/images/radio_art_default.jpg", status_code=302

View File

@ -113,9 +113,7 @@ class Transcriptions(FastAPI):
db_path: Union[str, LiteralString] = os.path.join(
"/usr/local/share", "sqlite_dbs", "sp.db"
)
db_query: str = (
"""SELECT ("S" || Season || "E" || Episode || " " || Title), Character, Line FROM SP_DAT WHERE ID = ?"""
)
db_query: str = """SELECT ("S" || Season || "E" || Episode || " " || Title), Character, Line FROM SP_DAT WHERE ID = ?"""
case 1:
db_path = os.path.join("/usr/local/share", "sqlite_dbs", "futur.db")
db_query = """SELECT ("S" || EP_S || "E" || EP_EP || " " || EP_TITLE || "<br><em>Opener: " || EP_OPENER || "</em>"), EP_LINE_SPEAKER, EP_LINE FROM clean_dialog WHERE EP_ID = ? ORDER BY LINE_ID ASC"""

View File

@ -78,7 +78,7 @@ class Aggregate:
traceback.print_exc()
logging.info("Could not increment redis failed counter: %s", str(e))
self.notifier.send(
f"ERROR @ {__file__.rsplit("/", maxsplit=1)[-1]}",
f"ERROR @ {__file__.rsplit('/', maxsplit=1)[-1]}",
f"Could not increment redis failed counter: {str(e)}",
)
return search_result

View File

@ -89,10 +89,8 @@ class Cache:
logging.debug(
"Checking whether %s is already stored", artistsong.replace("\n", " - ")
)
check_query: str = (
'SELECT id, artist, song FROM lyrics WHERE editdist3((lower(artist) || " " || lower(song)), (? || " " || ?))\
check_query: str = 'SELECT id, artist, song FROM lyrics WHERE editdist3((lower(artist) || " " || lower(song)), (? || " " || ?))\
<= 410 ORDER BY editdist3((lower(artist) || " " || lower(song)), ?) ASC LIMIT 1'
)
artistsong_split = artistsong.split("\n", maxsplit=1)
artist = artistsong_split[0].lower()
song = artistsong_split[1].lower()
@ -132,7 +130,7 @@ class Cache:
f"cache::store >> {str(e)}",
)
await self.notifier.send(
f"ERROR @ {__file__.rsplit("/", maxsplit=1)[-1]}",
f"ERROR @ {__file__.rsplit('/', maxsplit=1)[-1]}",
f"cache::store >> `{str(e)}`",
)
@ -213,8 +211,10 @@ class Cache:
lyrics = regex.sub(r"(<br>|\n|\r\n)", " / ", lyr_result.lyrics.strip())
lyrics = regex.sub(r"\s{2,}", " ", lyrics)
insert_query = "INSERT INTO lyrics (src, date_retrieved, artist, song, artistsong, confidence, lyrics)\
insert_query = (
"INSERT INTO lyrics (src, date_retrieved, artist, song, artistsong, confidence, lyrics)\
VALUES(?, ?, ?, ?, ?, ?, ?)"
)
params = (
lyr_result.src,
time.time(),
@ -258,10 +258,8 @@ class Cache:
if artist == "!" and song == "!":
random_search = True
search_query: str = (
"SELECT id, artist, song, lyrics, src, confidence\
search_query: str = "SELECT id, artist, song, lyrics, src, confidence\
FROM lyrics ORDER BY RANDOM() LIMIT 1"
)
logging.info("Searching %s - %s on %s", artist, song, self.label)
@ -319,11 +317,9 @@ class Cache:
self.cache_pre_query
) as _db_cursor:
if not random_search:
search_query: str = (
'SELECT id, artist, song, lyrics, src, confidence FROM lyrics\
search_query: str = 'SELECT id, artist, song, lyrics, src, confidence FROM lyrics\
WHERE editdist3((lower(artist) || " " || lower(song)), (? || " " || ?))\
<= 410 ORDER BY editdist3((lower(artist) || " " || lower(song)), ?) ASC LIMIT 10'
)
search_params: tuple = (
artist.strip(),
song.strip(),

View File

@ -31,7 +31,6 @@ class LastFM:
]
async with ClientSession() as session:
async with await session.get(
self.api_base_url,
params=request_params,

View File

@ -11,9 +11,11 @@ from regex import Pattern
import sqlite3
import gpt
import music_tag # type: ignore
from rapidfuzz import fuzz
from endpoints.constructors import RadioException
double_space: Pattern = regex.compile(r"\s{2,}")
non_alnum: Pattern = regex.compile(r"[^a-zA-Z0-9]")
"""
TODO:
@ -110,7 +112,36 @@ class RadioUtil:
out_result = [str(r["artistsong"]) for r in result]
return out_result
async def search_playlist(
def datatables_search(self, filter: str) -> Optional[list[dict]]:
"""DataTables Search
Args:
filter (str): The filter query to fuzzy match with
Returns:
list[str]: List of matching playlist items (if any are found)
"""
filter = filter.strip().lower()
matched: list[dict] = []
for item in self.active_playlist:
artist: str = item.get("artist", None)
song: str = item.get("song", None)
artistsong: str = item.get("artistsong", None)
album: str = item.get("album", None)
if not artist or not song or not artistsong:
continue
if non_alnum.sub("", filter) in non_alnum.sub("", artistsong).lower():
matched.append(item)
continue
if (
fuzz.ratio(filter, artist) >= 85
or fuzz.ratio(filter, song) >= 85
or fuzz.ratio(filter, album) >= 85
):
matched.append(item)
return matched
def search_playlist(
self,
artistsong: Optional[str] = None,
artist: Optional[str] = None,
@ -130,11 +161,9 @@ class RadioUtil:
if not artistsong and (not artist or not song):
raise RadioException("No query provided")
try:
search_query: str = (
'SELECT id, artist, song, (artist || " - " || song) AS artistsong, album, file_path, duration FROM tracks\
search_query: str = 'SELECT id, artist, song, (artist || " - " || song) AS artistsong, album, file_path, duration FROM tracks\
WHERE editdist3((lower(artist) || " " || lower(song)), (? || " " || ?))\
<= 410 ORDER BY editdist3((lower(artist) || " " || lower(song)), ?) ASC LIMIT 1'
)
if artistsong:
artistsong_split: list = artistsong.split(" - ", maxsplit=1)
(search_artist, search_song) = tuple(artistsong_split)
@ -153,9 +182,7 @@ class RadioUtil:
for ext in self.sqlite_exts:
db_conn.load_extension(ext)
db_conn.row_factory = sqlite3.Row
db_cursor = db_conn.execute(
search_query, search_params
)
db_cursor = db_conn.execute(search_query, search_params)
result: Optional[sqlite3.Row | bool] = db_cursor.fetchone()
if not result or not isinstance(result, sqlite3.Row):
return False
@ -178,7 +205,7 @@ class RadioUtil:
traceback.print_exc()
return False
async def add_genre(self, artist: str, genre: str) -> bool:
def add_genre(self, artist: str, genre: str) -> bool:
"""
Add artist/genre pairing to DB
Args:
@ -213,7 +240,7 @@ class RadioUtil:
traceback.print_exc()
return False
async def add_genres(self, pairs: list[dict[str, str]]) -> bool:
def add_genres(self, pairs: list[dict[str, str]]) -> bool:
"""
(BATCH) Add artist/genre pairings to DB
Expects list of dicts comprised of artist name (key), genre (value)
@ -228,9 +255,7 @@ class RadioUtil:
for pair in pairs:
try:
artist, genre = pair
query: str = (
"INSERT OR IGNORE INTO artist_genre (artist, genre) VALUES(?, ?)"
)
query: str = "INSERT OR IGNORE INTO artist_genre (artist, genre) VALUES(?, ?)"
params: tuple[str, str] = (artist, genre)
res = _db.execute(query, params)
if isinstance(res.lastrowid, int):
@ -411,11 +436,13 @@ class RadioUtil:
if isinstance(db_cursor.lastrowid, int):
db_conn.commit()
else:
logging.debug("No row inserted for track_id: %s w/ file_path: %s", track_id,
file_path)
logging.debug(
"No row inserted for track_id: %s w/ file_path: %s",
track_id,
file_path,
)
except Exception as e:
logging.debug("cache_album_art Exception: %s",
str(e))
logging.debug("cache_album_art Exception: %s", str(e))
traceback.print_exc()
async def get_album_art(self, track_id: int) -> Optional[bytes]:
@ -433,15 +460,12 @@ class RadioUtil:
query_params: tuple[int] = (track_id,)
db_cursor = db_conn.execute(query, query_params)
result: Optional[Union[sqlite3.Row, bool]] = (
db_cursor.fetchone()
)
result: Optional[Union[sqlite3.Row, bool]] = db_cursor.fetchone()
if not result or not isinstance(result, sqlite3.Row):
return None
return result["album_art"]
except Exception as e:
logging.debug("get_album_art Exception: %s",
str(e))
logging.debug("get_album_art Exception: %s", str(e))
traceback.print_exc()
return None