diff --git a/endpoints/constructors.py b/endpoints/constructors.py
index 7aff060..7b19aed 100644
--- a/endpoints/constructors.py
+++ b/endpoints/constructors.py
@@ -64,6 +64,9 @@ class ValidTopKarmaRequest(BaseModel):
LastFM
"""
+class LastFMException(Exception):
+ pass
+
class ValidArtistSearchRequest(BaseModel):
"""
- **a**: artist name
@@ -81,17 +84,17 @@ class ValidArtistSearchRequest(BaseModel):
class ValidAlbumDetailRequest(BaseModel):
"""
- **a**: artist name
- - **a2**: album/release name (as sourced from here/LastFM)
+ - **release**: album/release name (as sourced from here/LastFM)
"""
a: str
- a2: str
+ release: str
class Config: # pylint: disable=missing-class-docstring
schema_extra = {
"example": {
"a": "eminem",
- "a2": "houdini"
+ "release": "houdini"
}
}
diff --git a/endpoints/karma.py b/endpoints/karma.py
index 6e63f1d..5a15645 100644
--- a/endpoints/karma.py
+++ b/endpoints/karma.py
@@ -7,6 +7,7 @@ import time
import datetime
import traceback
import aiosqlite as sqlite3
+from typing import LiteralString, Optional
from fastapi import FastAPI, Request, HTTPException
from .constructors import ValidTopKarmaRequest, ValidKarmaRetrievalRequest,\
ValidKarmaUpdateRequest
@@ -14,11 +15,16 @@ from .constructors import ValidTopKarmaRequest, ValidKarmaRetrievalRequest,\
class KarmaDB:
"""Karma DB Util"""
def __init__(self):
- self.db_path = os.path.join("/", "usr", "local", "share",
+ self.db_path: LiteralString = os.path.join("/", "usr", "local", "share",
"sqlite_dbs", "karma.db")
async def get_karma(self, keyword: str) -> int | dict:
- """Get Karma Value for Keyword"""
+ """Get Karma Value for Keyword
+ Args:
+ keyword (str): The keyword to search
+ Returns:
+ int|dict
+ """
async with sqlite3.connect(self.db_path, timeout=2) as db_conn:
async with await db_conn.execute("SELECT score FROM karma WHERE keyword LIKE ? LIMIT 1", (keyword,)) as db_cursor:
try:
@@ -30,8 +36,13 @@ class KarmaDB:
'errorText': f'No records for {keyword}',
}
- async def get_top(self, n: int = 10):
- """Get Top n=10 Karma Entries"""
+ async def get_top(self, n: Optional[int] = 10) -> list[tuple]:
+ """Get Top n=10 Karma Entries
+ Args:
+ n (Optional[int]) = 10: The number of top results to return
+ Returns:
+ list[tuple]
+ """
try:
async with sqlite3.connect(self.db_path, timeout=2) as db_conn:
async with await db_conn.execute("SELECT keyword, score FROM karma ORDER BY score DESC LIMIT ?", (n,)) as db_cursor:
@@ -40,25 +51,32 @@ class KarmaDB:
traceback.print_exc()
return
- async def update_karma(self, granter: str, keyword: str, flag: int):
- """Update Karma for Keyword"""
+ async def update_karma(self, granter: str, keyword: str, flag: int) -> Optional[bool]:
+ """Update Karma for Keyword
+ Args:
+ granter (str): The user who granted (increased/decreased) the karma
+ keyword (str): The keyword to update
+ flag (int): 0 to increase karma, 1 to decrease karma
+ Returns:
+ Optional[bool]
+ """
+
if not flag in [0, 1]:
return
- modifier = "score + 1" if not flag else "score - 1"
- query = f"UPDATE karma SET score = {modifier}, last_change = ? WHERE keyword LIKE ?"
- new_keyword_query = "INSERT INTO karma(keyword, score, last_change) VALUES(?, ?, ?)"
- friendly_flag = "++" if not flag else "--"
- audit_message = f"{granter} adjusted karma for {keyword} @ {datetime.datetime.now().isoformat()}: {friendly_flag}"
- audit_query = "INSERT INTO karma_audit(impacted_keyword, comment) VALUES(?, ?)"
- now = int(time.time())
+ modifier: str = "score + 1" if not flag else "score - 1"
+ query: str = f"UPDATE karma SET score = {modifier}, last_change = ? WHERE keyword LIKE ?"
+ new_keyword_query: str = "INSERT INTO karma(keyword, score, last_change) VALUES(?, ?, ?)"
+ friendly_flag: str = "++" if not flag else "--"
+ audit_message: str = f"{granter} adjusted karma for {keyword} @ {datetime.datetime.now().isoformat()}: {friendly_flag}"
+ audit_query: str = "INSERT INTO karma_audit(impacted_keyword, comment) VALUES(?, ?)"
+ now: int = int(time.time())
logging.debug("Audit message: %s{audit_message}\nKeyword: %s{keyword}")
async with sqlite3.connect(self.db_path, timeout=2) as db_conn:
async with await db_conn.execute(audit_query, (keyword, audit_message,)) as db_cursor:
- await db_conn.commit()
- await db_cursor.close()
+ await db_conn.commit()
async with await db_conn.execute(query, (now, keyword,)) as db_cursor:
if db_cursor.rowcount:
await db_conn.commit()
@@ -72,11 +90,6 @@ class KarmaDB:
return True
else:
return False
-
-
-
-
-
class Karma(FastAPI):
"""Karma Endpoints"""
def __init__(self, app: FastAPI, util, constants, glob_state): # pylint: disable=super-init-not-called
@@ -86,7 +99,7 @@ class Karma(FastAPI):
self.glob_state = glob_state
self.db = KarmaDB()
- self.endpoints = {
+ self.endpoints: dict = {
"karma/get": self.get_karma_handler,
"karma/modify": self.modify_karma_handler,
"karma/top": self.top_karma_handler,
@@ -97,23 +110,19 @@ class Karma(FastAPI):
include_in_schema=False)
- async def top_karma_handler(self, request: Request, data: ValidTopKarmaRequest | None = None):
- """
- /karma/top
- Get top keywords for karma
- (Requires key)
- """
+ async def top_karma_handler(self, request: Request, data: ValidTopKarmaRequest | None = None) -> list[tuple]|dict:
+ """Get top keywords for karma"""
if not self.util.check_key(request.url.path, request.headers.get('X-Authd-With')):
raise HTTPException(status_code=403, detail="Unauthorized")
- n = 10
+ n: int = 10
if data:
- n = data.n
+ n: int = int(data.n)
try:
- top10 = await self.db.get_top(n=n)
+ top10: list[tuple] = await self.db.get_top(n=n)
return top10
except:
traceback.print_exc()
@@ -123,18 +132,14 @@ class Karma(FastAPI):
}
async def get_karma_handler(self, data: ValidKarmaRetrievalRequest, request: Request):
- """
- /karma/get
- Get current karma value
- (Requires key)
- """
+ """Get current karma value"""
if not self.util.check_key(request.url.path, request.headers.get('X-Authd-With')):
raise HTTPException(status_code=403, detail="Unauthorized")
- keyword = data.keyword
+ keyword: str = data.keyword
try:
- count = await self.db.get_karma(keyword)
+ count: int|dict = await self.db.get_karma(keyword)
return {
'keyword': keyword,
'count': count,
@@ -146,12 +151,8 @@ class Karma(FastAPI):
'errorText': "Exception occurred."
}
- async def modify_karma_handler(self, data: ValidKarmaUpdateRequest, request: Request):
- """
- /karma/update
- Update karma count
- (Requires key)
- """
+ async def modify_karma_handler(self, data: ValidKarmaUpdateRequest, request: Request) -> dict:
+ """Update karma count"""
if not self.util.check_key(request.url.path, request.headers.get('X-Authd-With'), 2):
raise HTTPException(status_code=403, detail="Unauthorized")
diff --git a/endpoints/lastfm.py b/endpoints/lastfm.py
index 41992b5..ad30c5d 100644
--- a/endpoints/lastfm.py
+++ b/endpoints/lastfm.py
@@ -3,9 +3,10 @@
import importlib
import traceback
+from typing import Optional
from fastapi import FastAPI
from .constructors import ValidArtistSearchRequest, ValidAlbumDetailRequest,\
- ValidTrackInfoRequest
+ ValidTrackInfoRequest, LastFMException
class LastFM(FastAPI):
"""Last.FM Endpoints"""
@@ -34,7 +35,7 @@ class LastFM(FastAPI):
Get artist info
- **a**: Artist to search
"""
- artist = data.a.strip()
+ artist: Optional[str] = data.a.strip()
if not artist:
return {
'err': True,
@@ -53,24 +54,24 @@ class LastFM(FastAPI):
'result': artist_result
}
- async def artist_album_handler(self, data: ValidArtistSearchRequest):
+ async def artist_album_handler(self, data: ValidArtistSearchRequest) -> dict:
"""
Get artist's albums/releases
- **a**: Artist to search
"""
- artist = data.a.strip()
+ artist: str = data.a.strip()
if not artist:
return {
'err': True,
'errorText': 'No artist specified'
}
- album_result = await self.lastfm.get_artist_albums(artist=artist)
- album_result_out = []
- seen_release_titles = []
+ album_result: dict|list[dict] = await self.lastfm.get_artist_albums(artist=artist)
+ album_result_out: list = []
+ seen_release_titles: list = []
for release in album_result:
- release_title = release.get('title')
+ release_title: str = release.get('title')
if release_title.lower() in seen_release_titles:
continue
seen_release_titles.append(release_title.lower())
@@ -81,14 +82,14 @@ class LastFM(FastAPI):
'result': album_result_out
}
- async def release_detail_handler(self, data: ValidAlbumDetailRequest):
+ async def release_detail_handler(self, data: ValidAlbumDetailRequest) -> dict:
"""
Get details of a particular release by an artist
- **a**: Artist to search
- - **a2**: Release title to search (subject to change)
+ - **release**: Release title to search
"""
- artist = data.a.strip()
- release = data.a2.strip()
+ artist: str = data.a.strip()
+ release: str = data.release.strip()
if not artist or not release:
return {
@@ -110,14 +111,14 @@ class LastFM(FastAPI):
'result': ret_obj
}
- async def release_tracklist_handler(self, data: ValidAlbumDetailRequest):
+ async def release_tracklist_handler(self, data: ValidAlbumDetailRequest) -> dict:
"""
Get track list for a particular release by an artist
- **a**: Artist to search
- - **a2**: Release title to search (subject to change)
+ - **release**: Release title to search
"""
- artist = data.a.strip()
- release = data.a2.strip()
+ artist: str = data.a.strip()
+ release: str = data.release.strip()
if not artist or not release:
return {
@@ -125,7 +126,7 @@ class LastFM(FastAPI):
'errorText': 'Invalid request'
}
- tracklist_result = await self.lastfm.get_album_tracklist(artist=artist, album=release)
+ tracklist_result: dict = await self.lastfm.get_album_tracklist(artist=artist, album=release)
return {
'success': True,
'id': tracklist_result.get('id'),
@@ -135,15 +136,15 @@ class LastFM(FastAPI):
'tracks': tracklist_result.get('tracks')
}
- async def track_info_handler(self, data: ValidTrackInfoRequest):
+ async def track_info_handler(self, data: ValidTrackInfoRequest) -> dict:
"""
Get track info from Last.FM given an artist/track
- **a**: Artist to search
- **t**: Track title to search
"""
try:
- artist = data.a
- track = data.t
+ artist: str = data.a
+ track: str = data.t
if not artist or not track:
return {
@@ -151,8 +152,10 @@ class LastFM(FastAPI):
'errorText': 'Invalid request'
}
- track_info_result = await self.lastfm.get_track_info(artist=artist, track=track)
- assert not "err" in track_info_result.keys()
+ track_info_result: dict = await self.lastfm.get_track_info(artist=artist, track=track)
+ if "err" in track_info_result:
+ raise LastFMException("Unknown error occurred: %s",
+ track_info_result.get('errorText', '??'))
return {
'success': True,
'result': track_info_result
diff --git a/endpoints/lyric_search.py b/endpoints/lyric_search.py
index ca34f15..13cd38d 100644
--- a/endpoints/lyric_search.py
+++ b/endpoints/lyric_search.py
@@ -7,27 +7,29 @@ import urllib.parse
import regex
import aiosqlite as sqlite3
from fastapi import FastAPI, HTTPException
+from typing import LiteralString, Optional, Pattern
from .constructors import ValidTypeAheadRequest, ValidLyricRequest
+from lyric_search.constructors import LyricsResult
from lyric_search.sources import aggregate
from lyric_search import notifier
class CacheUtils:
"""Lyrics Cache DB Utils"""
def __init__(self):
- self.lyrics_db_path = os.path.join("/", "usr", "local", "share",
+ self.lyrics_db_path: LiteralString = os.path.join("/", "usr", "local", "share",
"sqlite_dbs", "cached_lyrics.db")
- async def check_typeahead(self, s: str, pre_query: str | None = None):
+ async def check_typeahead(self, s: str, pre_query: str | None = None) -> Optional[list[dict]]:
"""Check s against artists stored - for typeahead"""
async with sqlite3.connect(self.lyrics_db_path,
timeout=2) as db_conn:
db_conn.row_factory = lambda c, r: dict([(col[0], r[idx]) for idx, col in enumerate(c.description)])
if not pre_query:
- query = "SELECT distinct(artist) FROM lyrics WHERE artist LIKE ? LIMIT 15"
- query_params = (f"%{s}%",)
+ query: str = "SELECT distinct(artist) FROM lyrics WHERE artist LIKE ? LIMIT 15"
+ query_params: tuple = (f"%{s}%",)
else:
- query = "SELECT distinct(song) FROM lyrics WHERE artist LIKE ? AND song LIKE ? LIMIT 15"
- query_params = (f"%{pre_query}%", f"%{s}%",)
+ query: str = "SELECT distinct(song) FROM lyrics WHERE artist LIKE ? AND song LIKE ? LIMIT 15"
+ query_params: tuple = (f"%{pre_query}%", f"%{s}%",)
async with await db_conn.execute(query, query_params) as db_cursor:
return await db_cursor.fetchall()
@@ -43,14 +45,14 @@ class LyricSearch(FastAPI):
self.notifier = notifier.DiscordNotifier()
- self.endpoints = {
+ self.endpoints: dict = {
"typeahead/artist": self.artist_typeahead_handler,
"typeahead/song": self.song_typeahead_handler,
"lyric_search": self.lyric_search_handler, # Preserving old endpoint path temporarily
"lyric/search": self.lyric_search_handler,
}
- self.acceptable_request_sources = [
+ self.acceptable_request_sources: list = [
"WEB",
"WEB-RADIO",
"IRC-MS",
@@ -62,25 +64,25 @@ class LyricSearch(FastAPI):
"LIMNORIA-SHARED",
]
- self.lrc_regex = regex.compile(r'\[([0-9]{2}:[0-9]{2})\.[0-9]{1,3}\](\s(.*)){0,}')
+ self.lrc_regex: Pattern = regex.compile(r'\[([0-9]{2}:[0-9]{2})\.[0-9]{1,3}\](\s(.*)){0,}')
for endpoint, handler in self.endpoints.items():
_schema_include = endpoint in ["lyric/search"]
app.add_api_route(f"/{endpoint}", handler, methods=["POST"], include_in_schema=_schema_include)
- async def artist_typeahead_handler(self, data: ValidTypeAheadRequest):
+ async def artist_typeahead_handler(self, data: ValidTypeAheadRequest) -> list[str]|dict:
"""Artist Type Ahead Handler"""
if not isinstance(data.query, str) or len(data.query) < 2:
return {
'err': True,
'errorText': 'Invalid request',
}
- query = data.query
- typeahead_result = await self.cache_utils.check_typeahead(query)
- typeahead_list = [str(r.get('artist')) for r in typeahead_result]
+ query: str = data.query
+ typeahead_result: Optional[list[dict]] = await self.cache_utils.check_typeahead(query)
+ typeahead_list: list[str] = [str(r.get('artist')) for r in typeahead_result]
return typeahead_list
- async def song_typeahead_handler(self, data: ValidTypeAheadRequest):
+ async def song_typeahead_handler(self, data: ValidTypeAheadRequest) -> list[str]|dict:
"""Song Type Ahead Handler"""
if not isinstance(data.pre_query, str)\
or not isinstance(data.query, str|None):
@@ -88,13 +90,13 @@ class LyricSearch(FastAPI):
'err': True,
'errorText': 'Invalid request',
}
- pre_query = data.pre_query
- query = data.query
- typeahead_result = await self.cache_utils.check_typeahead(query, pre_query)
- typeahead_list = [str(r.get('song')) for r in typeahead_result]
+ pre_query: str = data.pre_query
+ query: str = data.query
+ typeahead_result: Optional[list[dict]] = await self.cache_utils.check_typeahead(query, pre_query)
+ typeahead_list: list[str] = [str(r.get('song')) for r in typeahead_result]
return typeahead_list
- async def lyric_search_handler(self, data: ValidLyricRequest):
+ async def lyric_search_handler(self, data: ValidLyricRequest) -> dict:
"""
Search for lyrics
@@ -129,15 +131,15 @@ class LyricSearch(FastAPI):
if search_artist and search_song:
- search_artist = self.constants.DOUBLE_SPACE_REGEX.sub(" ", search_artist.strip())
- search_song = self.constants.DOUBLE_SPACE_REGEX.sub(" ", search_song.strip())
- search_artist = urllib.parse.unquote(search_artist)
- search_song = urllib.parse.unquote(search_song)
+ search_artist: str = self.constants.DOUBLE_SPACE_REGEX.sub(" ", search_artist.strip())
+ search_song: str = self.constants.DOUBLE_SPACE_REGEX.sub(" ", search_song.strip())
+ search_artist: str = urllib.parse.unquote(search_artist)
+ search_song: str = urllib.parse.unquote(search_song)
- excluded_sources = data.excluded_sources
+ excluded_sources: list = data.excluded_sources
aggregate_search = aggregate.Aggregate(exclude_methods=excluded_sources)
- plain_lyrics = not data.lrc
- result = await aggregate_search.search(search_artist, search_song, plain_lyrics)
+ plain_lyrics: bool = not data.lrc
+ result: Optional[LyricsResult] = await aggregate_search.search(search_artist, search_song, plain_lyrics)
if not result:
return {
@@ -145,15 +147,15 @@ class LyricSearch(FastAPI):
'errorText': 'Sources exhausted, lyrics not located.',
}
- result = result.todict()
+ result: dict = result.todict()
if data.sub and not data.lrc:
- seeked_found_line = None
- lyric_lines = result['lyrics'].strip().split(" / ")
+ seeked_found_line: Optional[int] = None
+ lyric_lines: list[str] = result['lyrics'].strip().split(" / ")
for i, line in enumerate(lyric_lines):
- line = regex.sub(r'\u2064', '', line.strip())
+ line: str = regex.sub(r'\u2064', '', line.strip())
if data.sub.strip().lower() in line.strip().lower():
- seeked_found_line = i
+ seeked_found_line: int = i
logging.debug("Found %s at %s, match for %s!",
line, seeked_found_line, data.sub) # REMOVEME: DEBUG
break
diff --git a/endpoints/misc.py b/endpoints/misc.py
index 25efc01..65e0bbf 100644
--- a/endpoints/misc.py
+++ b/endpoints/misc.py
@@ -2,6 +2,7 @@
# pylint: disable=bare-except, broad-exception-caught, invalid-name
import time
+from typing import Optional
from fastapi import FastAPI
import redis.asyncio as redis
from lyric_search.sources import private, cache as LyricsCache, redis_cache
@@ -18,7 +19,7 @@ class Misc(FastAPI):
self.redis_cache = redis_cache.RedisCache()
self.redis_client = redis.Redis(password=private.REDIS_PW)
self.radio = radio
- self.endpoints = {
+ self.endpoints: dict = {
"widget/redis": self.homepage_redis_widget,
"widget/sqlite": self.homepage_sqlite_widget,
"widget/lyrics": self.homepage_lyrics_widget,
@@ -29,54 +30,35 @@ class Misc(FastAPI):
app.add_api_route(f"/{endpoint}", handler, methods=["GET"],
include_in_schema=False)
- async def get_radio_np(self) -> dict:
+ async def get_radio_np(self) -> str:
"""
Get radio now playing
- Uses XC endpoint
Args:
None
Returns:
str: Radio now playing in artist - song format
"""
- json_payload = {
- 'bid': 0,
- 'cmd': 'radio_metadata',
- 'key': f'Bearer {self.radio_pubkey}',
- }
-
- headers = {
- 'content-type': 'application/json; charset=utf-8',
- }
-
- artistsong = self.radio.now_playing['artistsong']
+ artistsong: Optional[str] = self.radio.radio_util.now_playing['artistsong']
if not isinstance(artistsong, str):
return "N/A - N/A"
return artistsong
async def homepage_redis_widget(self) -> dict:
- """
- /widget/redis
- Homepage Redis Widget Handler
- Args:
- None
- Returns:
- dict
- """
+ """Homepage Redis Widget Handler"""
# Measure response time w/ test lyric search
time_start: float = time.time() # Start time for response_time
test_lyrics_result = await self.redis_client.ft().search("@artist: test @song: test")
time_end: float = time.time()
# End response time test
-
total_keys = await self.redis_client.dbsize()
response_time: float = time_end - time_start
(_, ci_keys) = await self.redis_client.scan(cursor=0, match="ci_session*", count=10000000)
num_ci_keys = len(ci_keys)
index_info = await self.redis_client.ft().info()
- indexed_lyrics = index_info.get('num_docs')
+ indexed_lyrics: int = index_info.get('num_docs')
return {
'responseTime': round(response_time, 7),
'storedKeys': total_keys,
@@ -85,17 +67,11 @@ class Misc(FastAPI):
}
async def homepage_sqlite_widget(self) -> dict:
- """
- /widget/sqlite
- Homepage SQLite Widget Handler
- Args:
- None
- Returns:
- dict
- """
- row_count = await self.lyr_cache.sqlite_rowcount()
- distinct_artists = await self.lyr_cache.sqlite_distinct("artist")
- lyrics_length = await self.lyr_cache.sqlite_lyrics_length()
+ """Homepage SQLite Widget Handler"""
+
+ row_count: int = await self.lyr_cache.sqlite_rowcount()
+ distinct_artists: int = await self.lyr_cache.sqlite_distinct("artist")
+ lyrics_length: int = await self.lyr_cache.sqlite_lyrics_length()
return {
'storedRows': row_count,
'distinctArtists': distinct_artists,
@@ -103,26 +79,12 @@ class Misc(FastAPI):
}
async def homepage_lyrics_widget(self) -> dict:
- """
- /widget/lyrics
- Homepage Lyrics Widget Handler
- Args:
- None
- Returns:
- dict
- """
- counts = await self.redis_cache.get_found_counts()
- return counts
+ """Homepage Lyrics Widget Handler"""
+
+ return await self.redis_cache.get_found_counts()
async def homepage_radio_widget(self) -> dict:
- """
- /widget/radio
- Homepage Radio Widget Handler
- Args:
- None
- Returns:
- dict
- """
+ """Homepage Radio Widget Handler"""
return {
'now_playing': await self.get_radio_np(),
diff --git a/endpoints/radio.py b/endpoints/radio.py
index f1d3a47..bbfa343 100644
--- a/endpoints/radio.py
+++ b/endpoints/radio.py
@@ -13,18 +13,12 @@ from . import radio_util
from .constructors import ValidRadioNextRequest, ValidRadioReshuffleRequest, ValidRadioQueueShiftRequest,\
ValidRadioQueueRemovalRequest, ValidRadioSongRequest, RadioException
from uuid import uuid4 as uuid
-from typing import Optional
+from typing import Optional, LiteralString
from fastapi import FastAPI, BackgroundTasks, Request, Response, HTTPException
from fastapi.responses import RedirectResponse
from aiohttp import ClientSession, ClientTimeout
# pylint: disable=bare-except, broad-exception-caught, invalid-name
-double_space = regex.compile(r'\s{2,}')
-
-"""
-TODO:
- minor refactoring/type annotations/docstrings
-"""
class Radio(FastAPI):
"""Radio Endpoints"""
@@ -34,24 +28,8 @@ class Radio(FastAPI):
self.constants = constants
self.radio_util = radio_util.RadioUtil(self.constants)
self.glob_state = glob_state
- self.ls_uri = "http://10.10.10.101:29000"
- self.sqlite_exts: list[str] = ['/home/singer/api/solibs/spellfix1.cpython-311-x86_64-linux-gnu.so']
- self.active_playlist_path = os.path.join("/usr/local/share",
- "sqlite_dbs", "track_file_map.db")
- self.active_playlist_name = "default" # not used
- self.active_playlist = []
- self.now_playing = {
- 'artist': 'N/A',
- 'song': 'N/A',
- 'genre': 'N/A',
- 'artistsong': 'N/A - N/A',
- 'duration': 0,
- 'start': 0,
- 'end': 0,
- 'file_path': None,
- 'id': None,
- }
- self.endpoints = {
+
+ self.endpoints: dict = {
"radio/np": self.radio_now_playing,
"radio/request": self.radio_request,
"radio/get_queue": self.radio_get_queue,
@@ -69,30 +47,8 @@ class Radio(FastAPI):
# NOTE: Not in loop because method is GET for this endpoint
app.add_api_route("/radio/album_art", self.album_art_handler, methods=["GET"],
include_in_schema=True)
- asyncio.get_event_loop().run_until_complete(self.load_playlist())
- asyncio.get_event_loop().run_until_complete(self._ls_skip())
-
-
- def get_queue_item_by_uuid(self, uuid: str) -> tuple[int, dict] | None:
- """
- Get queue item by UUID
- Args:
- uuid: The UUID to search
- Returns:
- dict|None
- """
- for x, item in enumerate(self.active_playlist):
- if item.get('uuid') == uuid:
- return (x, item)
- return None
-
- async def _ls_skip(self) -> bool:
- async with ClientSession() as session:
- async with session.get(f"{self.ls_uri}/next",
- timeout=ClientTimeout(connect=2, sock_read=2)) as request:
- request.raise_for_status()
- text = await request.text()
- return text == "OK"
+ asyncio.get_event_loop().run_until_complete(self.radio_util.load_playlist())
+ asyncio.get_event_loop().run_until_complete(self.radio_util._ls_skip())
async def radio_skip(self, data: ValidRadioNextRequest, request: Request) -> bool:
"""
@@ -102,11 +58,11 @@ class Radio(FastAPI):
if not self.util.check_key(path=request.url.path, req_type=4, key=data.key):
raise HTTPException(status_code=403, detail="Unauthorized")
if data.skipTo:
- (x, _) = self.get_queue_item_by_uuid(data.skipTo)
- self.active_playlist = self.active_playlist[x:]
- if not self.active_playlist:
- await self.load_playlist()
- return await self._ls_skip()
+ (x, _) = self.radio_util.get_queue_item_by_uuid(data.skipTo)
+ self.radio_util.active_playlist: list = self.radio_util.active_playlist[x:]
+ if not self.radio_util.active_playlist:
+ await self.radio_util.load_playlist()
+ return await self.radio_util._ls_skip()
except Exception as e:
traceback.print_exc()
return False
@@ -119,7 +75,7 @@ class Radio(FastAPI):
if not self.util.check_key(path=request.url.path, req_type=4, key=data.key):
raise HTTPException(status_code=403, detail="Unauthorized")
- random.shuffle(self.active_playlist)
+ random.shuffle(self.radio_util.active_playlist)
return {
'ok': True
}
@@ -128,13 +84,9 @@ class Radio(FastAPI):
async def radio_get_queue(self, request: Request, limit: int = 20_000) -> dict:
"""
Get current play queue, up to limit n [default: 20k]
- Args:
- limit (int): Number of results to return (default 20k)
- Returns:
- dict
"""
queue_out = []
- for x, item in enumerate(self.active_playlist[0:limit+1]):
+ for x, item in enumerate(self.radio_util.active_playlist[0:limit+1]):
queue_out.append({
'pos': x,
'id': item.get('id'),
@@ -153,11 +105,11 @@ class Radio(FastAPI):
if not self.util.check_key(path=request.url.path, req_type=4, key=data.key):
raise HTTPException(status_code=403, detail="Unauthorized")
- (x, item) = self.get_queue_item_by_uuid(data.uuid)
- self.active_playlist.pop(x)
- self.active_playlist.insert(0, item)
+ (x, item) = self.radio_util.get_queue_item_by_uuid(data.uuid)
+ self.radio_util.active_playlist.pop(x)
+ self.radio_util.active_playlist.insert(0, item)
if not data.next:
- await self._ls_skip()
+ await self.radio_util._ls_skip()
return {
'ok': True,
}
@@ -167,158 +119,25 @@ class Radio(FastAPI):
if not self.util.check_key(path=request.url.path, req_type=4, key=data.key):
raise HTTPException(status_code=403, detail="Unauthorized")
- (x, found_item) = self.get_queue_item_by_uuid(data.uuid)
+ (x, found_item) = self.radio_util.get_queue_item_by_uuid(data.uuid)
if not found_item:
return {
'ok': False,
'err': 'UUID not found in play queue',
}
- self.active_playlist.pop(x)
+ self.radio_util.active_playlist.pop(x)
return {
'ok': True,
}
-
- async def search_playlist(self, artistsong: str|None = None, artist: str|None = None, song: str|None = None) -> bool:
- if artistsong and (artist or song):
- raise RadioException("Cannot search using combination provided")
- if not artistsong and (not artist or not song):
- raise RadioException("No query provided")
- try:
- search_artist = None
- search_song = None
- search_query = 'SELECT id, artist, song, (artist || " - " || song) AS artistsong, genre, file_path, duration FROM tracks\
- WHERE editdist3((lower(artist) || " " || lower(song)), (? || " " || ?))\
- <= 410 ORDER BY editdist3((lower(artist) || " " || lower(song)), ?) ASC LIMIT 1'
- if artistsong:
- artistsong_split = artistsong.split(" - ", maxsplit=1)
- (search_artist, search_song) = tuple(artistsong_split)
- else:
- search_artist = artist
- search_song = song
- if not artistsong:
- artistsong = f"{search_artist} - {search_song}"
- search_params = (search_artist.lower(), search_song.lower(), artistsong.lower(),)
- async with sqlite3.connect(self.active_playlist_path,
- timeout=2) as db_conn:
- await db_conn.enable_load_extension(True)
- for ext in self.sqlite_exts:
- await db_conn.load_extension(ext)
- db_conn.row_factory = sqlite3.Row
- async with await db_conn.execute(search_query, search_params) as db_cursor:
- result = await db_cursor.fetchone()
- if not result:
- return False
- pushObj = {
- 'id': result['id'],
- 'uuid': str(uuid().hex),
- 'artist': result['artist'].strip(),
- 'song': result['song'].strip(),
- 'artistsong': result['artistsong'].strip(),
- 'genre': result['genre'],
- 'file_path': result['file_path'],
- 'duration': result['duration'],
- }
- self.active_playlist.insert(0, pushObj)
- return True
- except Exception as e:
- logging.critical("search_playlist:: Search error occurred: %s", str(e))
- traceback.print_exc()
- return False
-
- async def load_playlist(self):
- try:
- logging.info(f"Loading playlist...")
- self.active_playlist.clear()
- # db_query = 'SELECT distinct(artist || " - " || song) AS artistdashsong, id, artist, song, genre, file_path, duration FROM tracks\
- # GROUP BY artistdashsong ORDER BY RANDOM()'
-
- """
- LIMITED GENRES
- """
-
- db_query = 'SELECT distinct(artist || " - " || song) AS artistdashsong, id, artist, song, genre, file_path, duration FROM tracks\
- WHERE genre IN ("metalcore", "pop punk", "punk rock", "metal", "punk", "electronic", "nu metal", "EDM",\
- "post-hardcore", "pop rock", "experimental", "post-punk", "death metal", "electronicore", "hard rock", "psychedelic rock",\
- "grunge", "house", "dubstep", "hardcore", "hair metal", "horror punk", "folk punk", "breakcore",\
- "post-rock", "deathcore", "hardcore punk", "synthwave", "trap") GROUP BY artistdashsong ORDER BY RANDOM()'
-
- async with sqlite3.connect(self.active_playlist_path,
- timeout=2) as db_conn:
- db_conn.row_factory = sqlite3.Row
- async with await db_conn.execute(db_query) as db_cursor:
- results = await db_cursor.fetchall()
- self.active_playlist = [{
- 'uuid': str(uuid().hex),
- 'id': r['id'],
- 'artist': double_space.sub(' ', r['artist']).strip(),
- 'song': double_space.sub(' ', r['song']).strip(),
- 'genre': r['genre'] if r['genre'] else 'Unknown',
- 'artistsong': double_space.sub(' ', r['artistdashsong']).strip(),
- 'file_path': r['file_path'],
- 'duration': r['duration'],
- } for r in results]
- logging.info("Populated active playlists with %s items",
- len(self.active_playlist))
- except:
- traceback.print_exc()
-
- async def cache_album_art(self, track_id: int, album_art: bytes) -> None:
- try:
- async with sqlite3.connect(self.active_playlist_path,
- timeout=2) as db_conn:
- async with await db_conn.execute("UPDATE tracks SET album_art = ? WHERE id = ?",
- (album_art, track_id,)) as db_cursor:
- await db_conn.commit()
- except:
- traceback.print_exc()
-
- async def get_album_art(self, track_id: Optional[int] = None,
- file_path: Optional[str] = None) -> bytes:
- try:
- async with sqlite3.connect(self.active_playlist_path,
- timeout=2) as db_conn:
- db_conn.row_factory = sqlite3.Row
- query = "SELECT album_art FROM tracks WHERE id = ?"
- query_params = (track_id,)
-
- if file_path and not track_id:
- query = "SELECT album_art FROM tracks WHERE file_path = ?"
- query_params = (file_path,)
-
- async with await db_conn.execute(query,
- query_params) as db_cursor:
- result = await db_cursor.fetchone()
- if not result:
- return
- return result['album_art']
- except:
- traceback.print_exc()
- return
-
- async def _get_album_art(self, track_id: Optional[int] = None, file_path: Optional[str] = None) -> bytes|None:
- try:
- if not file_path:
- file_path = self.now_playing.get('file_path')
-
- if not file_path:
- logging.critical("_get_album_art:: No current file")
- return
- original_file_path = file_path
- file_path = file_path.replace("/paul/toons/",
- "/singer/gogs_toons/")
- cached_album_art = await self.get_album_art(file_path=original_file_path,
- track_id=track_id)
- if cached_album_art:
- return cached_album_art
- except:
- traceback.print_exc()
-
- # TODO: Optimize/cache
async def album_art_handler(self, request: Request, track_id: Optional[int] = None) -> bytes:
+ """
+ Get album art, optional parameter track_id may be specified.
+ Otherwise, current track album art will be pulled.
+ """
try:
logging.debug("Seeking album art with trackId: %s", track_id)
- album_art = await self._get_album_art(track_id=track_id)
+ album_art: Optional[bytes] = await self.radio_util._get_album_art(track_id=track_id)
if not album_art:
return RedirectResponse(url="https://codey.lol/images/radio_art_default.jpg",
status_code=302)
@@ -330,16 +149,14 @@ class Radio(FastAPI):
status_code=302)
async def radio_now_playing(self, request: Request) -> dict:
- ret_obj = {**self.now_playing}
- cur_elapsed = self.now_playing.get('elapsed', -1)
- cur_duration = self.now_playing.get('duration', 999999)
+ ret_obj: dict = {**self.radio_util.now_playing}
+ cur_elapsed: int = self.radio_util.now_playing.get('elapsed', -1)
+ cur_duration: int = self.radio_util.now_playing.get('duration', 999999)
try:
ret_obj['elapsed'] = int(time.time()) - ret_obj['start']
except KeyError:
traceback.print_exc()
ret_obj['elapsed'] = 0
- elapsed = ret_obj['elapsed']
- duration = ret_obj['duration']
ret_obj.pop('file_path')
return ret_obj
@@ -348,36 +165,31 @@ class Radio(FastAPI):
background_tasks: BackgroundTasks) -> Optional[dict]:
"""
Get next track
- Args:
- None
- Returns:
- str: Next track in queue
-
- Track will be removed from the queue in the process (pop from top of list).
+ Track will be removed from the queue in the process.
"""
if not self.util.check_key(path=request.url.path, req_type=4, key=data.key):
raise HTTPException(status_code=403, detail="Unauthorized")
- if not isinstance(self.active_playlist, list) or not self.active_playlist:
- await self.load_playlist()
- await self._ls_skip()
+ if not isinstance(self.radio_util.active_playlist, list) or not self.radio_util.active_playlist:
+ await self.radio_util.load_playlist()
+ await self.radio_util._ls_skip()
return
- next = self.active_playlist.pop(0)
+ next = self.radio_util.active_playlist.pop(0)
if not isinstance(next, dict):
logging.critical("next is of type: %s, reloading playlist...", type(next))
- await self.load_playlist()
- await self._ls_skip()
+ await self.radio_util.load_playlist()
+ await self.radio_util._ls_skip()
return
- duration = next['duration']
- time_started = int(time.time())
- time_ends = int(time_started + duration)
+ duration: int = next['duration']
+ time_started: int = int(time.time())
+ time_ends: int = int(time_started + duration)
- if len(self.active_playlist) > 1:
- self.active_playlist.append(next) # Push to end of playlist
+ if len(self.radio_util.active_playlist) > 1:
+ self.radio_util.active_playlist.append(next) # Push to end of playlist
else:
- await self.load_playlist()
+ await self.radio_util.load_playlist()
- self.now_playing = next
+ self.radio_util.now_playing: dict = next
next['start'] = time_started
next['end'] = time_ends
try:
@@ -385,9 +197,9 @@ class Radio(FastAPI):
except Exception as e:
traceback.print_exc()
try:
- if not await self.get_album_art(file_path=next['file_path']):
- album_art = await self._get_album_art(next['file_path'])
- await self.cache_album_art(next['id'], album_art)
+ if not await self.radio_util.get_album_art(file_path=next['file_path']):
+ album_art = await self.radio_util._get_album_art(next['file_path'])
+ await self.radio_util.cache_album_art(next['id'], album_art)
except:
traceback.print_exc()
return next
@@ -396,9 +208,9 @@ class Radio(FastAPI):
async def radio_request(self, data: ValidRadioSongRequest, request: Request) -> Response:
if not self.util.check_key(path=request.url.path, req_type=4, key=data.key):
raise HTTPException(status_code=403, detail="Unauthorized")
- artistsong = data.artistsong
- artist = data.artist
- song = data.song
+ artistsong: str = data.artistsong
+ artist: str = data.artist
+ song: str = data.song
if artistsong and (artist or song):
return {
'err': True,
@@ -410,9 +222,11 @@ class Radio(FastAPI):
'errorText': 'Invalid request',
}
- search = await self.search_playlist(artistsong=artistsong,
+ search: bool = await self.radio_util.search_playlist(artistsong=artistsong,
artist=artist,
song=song)
if data.alsoSkip:
- await self._ls_skip()
- return {'result': search}
\ No newline at end of file
+ await self.radio_util._ls_skip()
+ return {
+ 'result': search
+ }
\ No newline at end of file
diff --git a/endpoints/radio_util.py b/endpoints/radio_util.py
index 9417217..e99472a 100644
--- a/endpoints/radio_util.py
+++ b/endpoints/radio_util.py
@@ -7,15 +7,40 @@ Radio Utils
import logging
import traceback
import time
+import regex
import datetime
+import os
import gpt
from aiohttp import ClientSession, ClientTimeout
+import aiosqlite as sqlite3
+from typing import Optional, LiteralString
+from uuid import uuid4 as uuid
+from .constructors import RadioException
+
+double_space = regex.compile(r'\s{2,}')
class RadioUtil:
def __init__(self, constants) -> None:
self.constants = constants
self.gpt = gpt.GPT(self.constants)
- self.webhooks = {
+ self.ls_uri: str = "http://10.10.10.101:29000"
+ self.sqlite_exts: list[str] = ['/home/singer/api/solibs/spellfix1.cpython-311-x86_64-linux-gnu.so']
+ self.active_playlist_path: str|LiteralString = os.path.join("/usr/local/share",
+ "sqlite_dbs", "track_file_map.db")
+ self.active_playlist_name = "default" # not used
+ self.active_playlist: list = []
+ self.now_playing: dict = {
+ 'artist': 'N/A',
+ 'song': 'N/A',
+ 'genre': 'N/A',
+ 'artistsong': 'N/A - N/A',
+ 'duration': 0,
+ 'start': 0,
+ 'end': 0,
+ 'file_path': None,
+ 'id': None,
+ }
+ self.webhooks: dict = {
'gpt': {
'hook': self.constants.GPT_WEBHOOK,
},
@@ -23,6 +48,7 @@ class RadioUtil:
'hook': self.constants.SFM_WEBHOOK,
}
}
+
def duration_conv(self, s: int|float) -> str:
"""
Convert duration given in seconds to hours, minutes, and seconds (h:m:s)
@@ -32,8 +58,169 @@ class RadioUtil:
str
"""
return str(datetime.timedelta(seconds=s)).split(".", maxsplit=1)[0]
+
+ async def search_playlist(self, artistsong: str|None = None, artist: str|None = None, song: str|None = None) -> bool:
+ if artistsong and (artist or song):
+ raise RadioException("Cannot search using combination provided")
+ if not artistsong and (not artist or not song):
+ raise RadioException("No query provided")
+ try:
+ search_artist: Optional[str] = None
+ search_song: Optional[str] = None
+ search_query: str = 'SELECT id, artist, song, (artist || " - " || song) AS artistsong, genre, file_path, duration FROM tracks\
+ WHERE editdist3((lower(artist) || " " || lower(song)), (? || " " || ?))\
+ <= 410 ORDER BY editdist3((lower(artist) || " " || lower(song)), ?) ASC LIMIT 1'
+ if artistsong:
+ artistsong_split: list = artistsong.split(" - ", maxsplit=1)
+ (search_artist, search_song) = tuple(artistsong_split)
+ else:
+ search_artist: str = artist
+ search_song: str = song
+ if not artistsong:
+ artistsong: str = f"{search_artist} - {search_song}"
+ search_params = (search_artist.lower(), search_song.lower(), artistsong.lower(),)
+ async with sqlite3.connect(self.active_playlist_path,
+ timeout=2) as db_conn:
+ await db_conn.enable_load_extension(True)
+ for ext in self.sqlite_exts:
+ await db_conn.load_extension(ext)
+ db_conn.row_factory = sqlite3.Row
+ async with await db_conn.execute(search_query, search_params) as db_cursor:
+ result: Optional[sqlite3.Row|bool] = await db_cursor.fetchone()
+ if not result:
+ return False
+ pushObj: dict = {
+ 'id': result['id'],
+ 'uuid': str(uuid().hex),
+ 'artist': result['artist'].strip(),
+ 'song': result['song'].strip(),
+ 'artistsong': result['artistsong'].strip(),
+ 'genre': result['genre'],
+ 'file_path': result['file_path'],
+ 'duration': result['duration'],
+ }
+ self.active_playlist.insert(0, pushObj)
+ return True
+ except Exception as e:
+ logging.critical("search_playlist:: Search error occurred: %s", str(e))
+ traceback.print_exc()
+ return False
+
+ async def load_playlist(self):
+ try:
+ logging.info(f"Loading playlist...")
+ self.active_playlist.clear()
+ # db_query = 'SELECT distinct(artist || " - " || song) AS artistdashsong, id, artist, song, genre, file_path, duration FROM tracks\
+ # GROUP BY artistdashsong ORDER BY RANDOM()'
+
+ """
+ LIMITED GENRES
+ """
+
+ db_query: str = 'SELECT distinct(artist || " - " || song) AS artistdashsong, id, artist, song, genre, file_path, duration FROM tracks\
+ WHERE genre IN ("metalcore", "pop punk", "punk rock", "metal", "punk", "electronic", "nu metal", "EDM",\
+ "post-hardcore", "pop rock", "experimental", "post-punk", "death metal", "electronicore", "hard rock", "psychedelic rock",\
+ "grunge", "house", "dubstep", "hardcore", "hair metal", "horror punk", "folk punk", "breakcore",\
+ "post-rock", "deathcore", "hardcore punk", "synthwave", "trap") GROUP BY artistdashsong ORDER BY RANDOM()'
+
+ async with sqlite3.connect(self.active_playlist_path,
+ timeout=2) as db_conn:
+ db_conn.row_factory = sqlite3.Row
+ async with await db_conn.execute(db_query) as db_cursor:
+ results: Optional[list[sqlite3.Row]] = await db_cursor.fetchall()
+ self.active_playlist: list[dict] = [{
+ 'uuid': str(uuid().hex),
+ 'id': r['id'],
+ 'artist': double_space.sub(' ', r['artist']).strip(),
+ 'song': double_space.sub(' ', r['song']).strip(),
+ 'genre': r['genre'] if r['genre'] else 'Unknown',
+ 'artistsong': double_space.sub(' ', r['artistdashsong']).strip(),
+ 'file_path': r['file_path'],
+ 'duration': r['duration'],
+ } for r in results]
+ logging.info("Populated active playlists with %s items",
+ len(self.active_playlist))
+ except:
+ traceback.print_exc()
+
+ async def cache_album_art(self, track_id: int, album_art: bytes) -> None:
+ try:
+ async with sqlite3.connect(self.active_playlist_path,
+ timeout=2) as db_conn:
+ async with await db_conn.execute("UPDATE tracks SET album_art = ? WHERE id = ?",
+ (album_art, track_id,)) as db_cursor:
+ await db_conn.commit()
+ except:
+ traceback.print_exc()
+
+ async def get_album_art(self, track_id: Optional[int] = None,
+ file_path: Optional[str] = None) -> bytes:
+ try:
+ async with sqlite3.connect(self.active_playlist_path,
+ timeout=2) as db_conn:
+ db_conn.row_factory = sqlite3.Row
+ query: str = "SELECT album_art FROM tracks WHERE id = ?"
+ query_params: tuple = (track_id,)
+
+ if file_path and not track_id:
+ query: str = "SELECT album_art FROM tracks WHERE file_path = ?"
+ query_params: tuple = (file_path,)
+
+ async with await db_conn.execute(query,
+ query_params) as db_cursor:
+ result: Optional[sqlite3.Row|bool] = await db_cursor.fetchone()
+ if not result:
+ return
+ return result['album_art']
+ except:
+ traceback.print_exc()
+ return
+
+ async def _get_album_art(self, track_id: Optional[int] = None, file_path: Optional[str] = None) -> Optional[bytes]:
+ try:
+ if not file_path:
+ file_path: Optional[str] = self.now_playing.get('file_path')
+
+ if not file_path:
+ logging.critical("_get_album_art:: No current file")
+ return
+ original_file_path: Optional[str] = file_path
+ file_path: Optional[str] = file_path.replace("/paul/toons/",
+ "/singer/gogs_toons/")
+ cached_album_art: Optional[bytes|bool] = await self.get_album_art(file_path=original_file_path,
+ track_id=track_id)
+ if cached_album_art:
+ return cached_album_art
+ except:
+ traceback.print_exc()
+
+ def get_queue_item_by_uuid(self, uuid: str) -> Optional[tuple[int, dict]]:
+ """
+ Get queue item by UUID
+ Args:
+ uuid: The UUID to search
+ Returns:
+ dict|None
+ """
+ for x, item in enumerate(self.active_playlist):
+ if item.get('uuid') == uuid:
+ return (x, item)
+ return None
+
+ async def _ls_skip(self) -> bool:
+ try:
+ async with ClientSession() as session:
+ async with session.get(f"{self.ls_uri}/next",
+ timeout=ClientTimeout(connect=2, sock_read=2)) as request:
+ request.raise_for_status()
+ text: Optional[str] = await request.text()
+ return text == "OK"
+ except Exception as e:
+ logging.debug("Skip failed: %s", str(e))
- async def get_ai_song_info(self, artist: str, song: str) -> str|None:
+ return False # failsafe
+
+ async def get_ai_song_info(self, artist: str, song: str) -> Optional[str]:
"""
Get AI Song Info
Args:
@@ -42,7 +229,7 @@ class RadioUtil:
Returns:
str|None
"""
- response = await self.gpt.get_completion(prompt=f"I am going to listen to {song} by {artist}.")
+ response: Optional[str] = await self.gpt.get_completion(prompt=f"I am going to listen to {song} by {artist}.")
if not response:
logging.critical("No response received from GPT?")
return
@@ -58,10 +245,11 @@ class RadioUtil:
None
"""
+
# First, send track info
- friendly_track_start = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(track['start']))
- friendly_track_end = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(track['end']))
- hook_data = {
+ friendly_track_start: str = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(track['start']))
+ friendly_track_end: str = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(track['end']))
+ hook_data: dict = {
'username': 'serious.FM',
"embeds": [{
"title": "Now Playing",
@@ -95,7 +283,7 @@ class RadioUtil:
}]
}
- sfm_hook = self.webhooks['sfm'].get('hook')
+ sfm_hook: str = self.webhooks['sfm'].get('hook')
async with ClientSession() as session:
async with await session.post(sfm_hook, json=hook_data,
timeout=ClientTimeout(connect=5, sock_read=5), headers={
@@ -104,9 +292,12 @@ class RadioUtil:
# Next, AI feedback
- ai_response = await self.get_ai_song_info(track['artist'],
+ ai_response: Optional[str] = await self.get_ai_song_info(track['artist'],
track['song'])
- hook_data = {
+ if not ai_response:
+ return
+
+ hook_data: dict = {
'username': 'GPT',
"embeds": [{
"title": "AI Feedback",
@@ -115,7 +306,7 @@ class RadioUtil:
}]
}
- ai_hook = self.webhooks['gpt'].get('hook')
+ ai_hook: str = self.webhooks['gpt'].get('hook')
async with ClientSession() as session:
async with await session.post(ai_hook, json=hook_data,
timeout=ClientTimeout(connect=5, sock_read=5), headers={
diff --git a/endpoints/rand_msg.py b/endpoints/rand_msg.py
index ebfcca9..0998ece 100644
--- a/endpoints/rand_msg.py
+++ b/endpoints/rand_msg.py
@@ -23,7 +23,7 @@ class RandMsg(FastAPI):
Get a randomly generated message
"""
random.seed()
- short = data.short if data else False
+ short: bool = data.short if data else False
if not short:
db_rand_selected = random.choice([0, 1, 3])
else:
diff --git a/endpoints/transcriptions.py b/endpoints/transcriptions.py
index bed447f..ad93da3 100644
--- a/endpoints/transcriptions.py
+++ b/endpoints/transcriptions.py
@@ -3,17 +3,18 @@
import os
import aiosqlite as sqlite3
from fastapi import FastAPI
+from typing import Optional, LiteralString
from .constructors import ValidShowEpisodeLineRequest, ValidShowEpisodeListRequest
class Transcriptions(FastAPI):
"""Transcription Endpoints"""
- def __init__(self, app: FastAPI, util, constants, glob_state): # pylint: disable=super-init-not-called
+ def __init__(self, app: FastAPI, util, constants, glob_state) -> None: # pylint: disable=super-init-not-called
self.app = app
self.util = util
self.constants = constants
self.glob_state = glob_state
- self.endpoints = {
+ self.endpoints: dict = {
"transcriptions/get_episodes": self.get_episodes_handler,
"transcriptions/get_episode_lines": self.get_episode_lines_handler,
#tbd
@@ -23,15 +24,13 @@ class Transcriptions(FastAPI):
app.add_api_route(f"/{endpoint}", handler, methods=["POST"],
include_in_schema=False)
- async def get_episodes_handler(self, data: ValidShowEpisodeListRequest):
- """
- /transcriptions/get_episodes
- Get list of episodes by show id
- """
- show_id = data.s
- db_path = None
- db_query = None
- show_title = None
+ async def get_episodes_handler(self, data: ValidShowEpisodeListRequest) -> dict:
+ """Get list of episodes by show id"""
+
+ show_id: int = data.s
+ db_path: Optional[str|LiteralString] = None
+ db_query: Optional[str] = None
+ show_title: Optional[str] = None
if show_id is None:
return {
@@ -39,7 +38,7 @@ class Transcriptions(FastAPI):
'errorText': 'Invalid request'
}
- show_id = int(show_id)
+ show_id: int = int(show_id)
if not(str(show_id).isnumeric()) or show_id not in [0, 1, 2]:
return {
@@ -49,20 +48,20 @@ class Transcriptions(FastAPI):
match show_id:
case 0:
- db_path = os.path.join("/", "usr", "local", "share",
+ db_path: str|LiteralString = os.path.join("/", "usr", "local", "share",
"sqlite_dbs", "sp.db")
- db_query = """SELECT DISTINCT(("S" || Season || "E" || Episode || " " || Title)), ID FROM SP_DAT ORDER BY Season, Episode"""
- show_title = "South Park"
+ db_query: str = """SELECT DISTINCT(("S" || Season || "E" || Episode || " " || Title)), ID FROM SP_DAT ORDER BY Season, Episode"""
+ show_title: str = "South Park"
case 1:
- db_path = os.path.join("/", "usr", "local", "share",
+ db_path: str|LiteralString = os.path.join("/", "usr", "local", "share",
"sqlite_dbs", "futur.db")
- db_query = """SELECT DISTINCT(("S" || EP_S || "E" || EP_EP || " " || EP_TITLE)), EP_ID FROM clean_dialog ORDER BY EP_S, EP_EP"""
- show_title = "Futurama"
+ db_query: str = """SELECT DISTINCT(("S" || EP_S || "E" || EP_EP || " " || EP_TITLE)), EP_ID FROM clean_dialog ORDER BY EP_S, EP_EP"""
+ show_title: str = "Futurama"
case 2:
- db_path = os.path.join("/", "usr", "local", "share",
+ db_path: str|LiteralString = os.path.join("/", "usr", "local", "share",
"sqlite_dbs", "parks.db")
- db_query = """SELECT DISTINCT(("S" || EP_S || "E" || EP_EP || " " || EP_TITLE)), EP_ID FROM clean_dialog ORDER BY EP_S, EP_EP"""
- show_title = "Parks And Rec"
+ db_query: str = """SELECT DISTINCT(("S" || EP_S || "E" || EP_EP || " " || EP_TITLE)), EP_ID FROM clean_dialog ORDER BY EP_S, EP_EP"""
+ show_title: str = "Parks And Rec"
case _:
return {
'err': True,
@@ -71,7 +70,7 @@ class Transcriptions(FastAPI):
await self.glob_state.increment_counter('transcript_list_requests')
async with sqlite3.connect(database=db_path, timeout=1) as _db:
async with await _db.execute(db_query) as _cursor:
- result = await _cursor.fetchall()
+ result: list[tuple] = await _cursor.fetchall()
return {
"show_title": show_title,
"episodes": [
@@ -81,26 +80,24 @@ class Transcriptions(FastAPI):
} for item in result]
}
- async def get_episode_lines_handler(self, data: ValidShowEpisodeLineRequest):
- """/transcriptions/get_episode_lines
- Get lines for a particular episode
- """
- show_id = data.s
- episode_id = data.e
+ async def get_episode_lines_handler(self, data: ValidShowEpisodeLineRequest) -> dict:
+ """Get lines for a particular episode"""
+ show_id: int = data.s
+ episode_id: int = data.e
# pylint: disable=line-too-long
match show_id:
case 0:
- db_path = os.path.join("/", "usr", "local", "share",
+ db_path: str|LiteralString = os.path.join("/", "usr", "local", "share",
"sqlite_dbs", "sp.db")
- db_query = """SELECT ("S" || Season || "E" || Episode || " " || Title), Character, Line FROM SP_DAT WHERE ID = ?"""
+ db_query: str = """SELECT ("S" || Season || "E" || Episode || " " || Title), Character, Line FROM SP_DAT WHERE ID = ?"""
case 1:
- db_path = os.path.join("/", "usr", "local", "share",
+ db_path: str|LiteralString = os.path.join("/", "usr", "local", "share",
"sqlite_dbs", "futur.db")
- db_query = """SELECT ("S" || EP_S || "E" || EP_EP || " " || EP_TITLE || "
Opener: " || EP_OPENER || ""), EP_LINE_SPEAKER, EP_LINE FROM clean_dialog WHERE EP_ID = ? ORDER BY LINE_ID ASC"""
+ db_query: str = """SELECT ("S" || EP_S || "E" || EP_EP || " " || EP_TITLE || "
Opener: " || EP_OPENER || ""), EP_LINE_SPEAKER, EP_LINE FROM clean_dialog WHERE EP_ID = ? ORDER BY LINE_ID ASC"""
case 2:
- db_path = os.path.join("/", "usr", "local", "share",
+ db_path: str|LiteralString = os.path.join("/", "usr", "local", "share",
"sqlite_dbs", "parks.db")
- db_query = """SELECT ("S" || EP_S || "E" || EP_EP || " " || EP_TITLE), EP_LINE_SPEAKER, EP_LINE FROM clean_dialog WHERE EP_ID = ? ORDER BY id ASC"""
+ db_query: str = """SELECT ("S" || EP_S || "E" || EP_EP || " " || EP_TITLE), EP_LINE_SPEAKER, EP_LINE FROM clean_dialog WHERE EP_ID = ? ORDER BY id ASC"""
case _:
return {
@@ -110,10 +107,10 @@ class Transcriptions(FastAPI):
await self.glob_state.increment_counter('transcript_requests')
async with sqlite3.connect(database=db_path, timeout=1) as _db:
- params = (episode_id,)
+ params: tuple = (episode_id,)
async with await _db.execute(db_query, params) as _cursor:
- result = await _cursor.fetchall()
- first_result = result[0]
+ result: list[tuple] = await _cursor.fetchall()
+ first_result: tuple = result[0]
return {
'episode_id': episode_id,
'ep_friendly': first_result[0].strip(),
diff --git a/endpoints/xc.py b/endpoints/xc.py
index 27d6f01..5f4498a 100644
--- a/endpoints/xc.py
+++ b/endpoints/xc.py
@@ -1,21 +1,22 @@
#!/usr/bin/env python3.12
-# pylint: disable=invalid-name
+import logging
from fastapi import FastAPI, Request, HTTPException
from pydantic import BaseModel
from aiohttp import ClientSession, ClientTimeout
from .constructors import ValidXCRequest
+# pylint: disable=invalid-name
class XC(FastAPI):
"""XC (CrossComm) Endpoints"""
- def __init__(self, app: FastAPI, util, constants, glob_state): # pylint: disable=super-init-not-called
+ def __init__(self, app: FastAPI, util, constants, glob_state) -> None: # pylint: disable=super-init-not-called
self.app = app
self.util = util
self.constants = constants
self.glob_state = glob_state
- self.endpoints = {
+ self.endpoints: dict = {
"xc": self.xc_handler,
}
@@ -23,25 +24,18 @@ class XC(FastAPI):
app.add_api_route(f"/{endpoint}", handler, methods=["POST"],
include_in_schema=False)
- # async def put_ws_handler(self, ws: WebSocket):
- # await ws.accept()
- # await self.audio_streamer.handle_client(ws)
-
- async def xc_handler(self, data: ValidXCRequest, request: Request):
- """
- /xc
- Handle XC Commands
- """
+ async def xc_handler(self, data: ValidXCRequest, request: Request) -> dict:
+ """Handle XC Commands"""
try:
- key = data.key
- bid = data.bid
- cmd = data.cmd
- cmd_data = data.data
+ key: str = data.key
+ bid: int = data.bid
+ cmd: str = data.cmd
+ cmd_data: dict = data.data
if not self.util.check_key(path=request.url.path, req_type=0, key=key):
raise HTTPException(status_code=403, detail="Unauthorized")
- BID_ADDR_MAP = {
+ BID_ADDR_MAP: dict = {
0: '10.10.10.101:5991', # Patrick (a.k.a. Thomas a.k.a. Aces)
# TODO: add Havoc?
}
@@ -52,15 +46,15 @@ class XC(FastAPI):
'errorText': 'Invalid bot id'
}
- bot_api_url = f'http://{BID_ADDR_MAP[bid]}/'
+ bot_api_url: str = f'http://{BID_ADDR_MAP[bid]}/'
async with ClientSession() as session:
async with await session.post(f"{bot_api_url}{cmd}", json=cmd_data, headers={
'Content-Type': 'application/json; charset=utf-8'
}, timeout=ClientTimeout(connect=5, sock_read=5)) as request:
- response = await request.json()
+ response: dict = await request.json()
return {
'success': True,
'response': response
}
- except:
- pass
\ No newline at end of file
+ except Exception as e:
+ logging.debug("Error: %s", str(e))
\ No newline at end of file
diff --git a/endpoints/yt.py b/endpoints/yt.py
index 93c89c7..c509b31 100644
--- a/endpoints/yt.py
+++ b/endpoints/yt.py
@@ -3,18 +3,19 @@
import importlib
from fastapi import FastAPI
from pydantic import BaseModel
+from typing import Optional
from .constructors import ValidYTSearchRequest
class YT(FastAPI):
"""YT Endpoints"""
- def __init__(self, app: FastAPI, util, constants, glob_state): # pylint: disable=super-init-not-called
+ def __init__(self, app: FastAPI, util, constants, glob_state) -> None: # pylint: disable=super-init-not-called
self.app = app
self.util = util
self.constants = constants
self.glob_state = glob_state
self.ytsearch = importlib.import_module("youtube_search_async").YoutubeSearch()
- self.endpoints = {
+ self.endpoints: dict = {
"yt/search": self.yt_video_search_handler,
#tbd
}
@@ -23,15 +24,20 @@ class YT(FastAPI):
app.add_api_route(f"/{endpoint}", handler, methods=["POST"],
include_in_schema=True)
- async def yt_video_search_handler(self, data: ValidYTSearchRequest):
+ async def yt_video_search_handler(self, data: ValidYTSearchRequest) -> dict:
"""
Search for YT Video by Title (closest match returned)
- **t**: Title to search
"""
- title = data.t
- yts_res = await self.ytsearch.search(title)
- yt_video_id = yts_res[0].get('id', False)
+ title: str = data.t
+ yts_res: Optional[list[dict]] = await self.ytsearch.search(title)
+ if not yts_res:
+ return {
+ 'err': True,
+ 'errorText': 'No result.',
+ }
+ yt_video_id: str|bool = yts_res[0].get('id', False)
return {
'video_id': yt_video_id,
diff --git a/lastfm_wrapper.py b/lastfm_wrapper.py
index aa345d4..129c4f7 100644
--- a/lastfm_wrapper.py
+++ b/lastfm_wrapper.py
@@ -47,7 +47,7 @@ class LastFM:
'err': 'Failed'
}
- async def get_track_info(self, artist=None, track=None):
+ async def get_track_info(self, artist=None, track=None) -> dict:
"""Get Track Info from LastFM"""
try:
if artist is None or track is None:
@@ -74,7 +74,7 @@ class LastFM:
'err': 'General Failure'
}
- async def get_album_tracklist(self, artist=None, album=None):
+ async def get_album_tracklist(self, artist=None, album=None) -> dict:
"""Get Album Tracklist"""
try:
if artist is None or album is None:
@@ -98,7 +98,7 @@ class LastFM:
'err': 'General Failure'
}
- async def get_artist_albums(self, artist=None):
+ async def get_artist_albums(self, artist=None) -> dict|list[dict]:
"""Get Artists Albums from LastFM"""
try:
if artist is None: