This commit is contained in:
codey 2025-02-15 21:09:33 -05:00
parent 60416c493f
commit 39d1ddaffa
22 changed files with 509 additions and 525 deletions

View File

@ -2,71 +2,32 @@
# pylint: disable=bare-except, broad-exception-caught, invalid-name # pylint: disable=bare-except, broad-exception-caught, invalid-name
import logging import logging
import traceback
import regex import regex
from regex import Pattern from regex import Pattern
from typing import Union from typing import Union
from aiohttp import ClientSession, ClientTimeout from aiohttp import ClientSession, ClientTimeout
from fastapi import FastAPI, Request, HTTPException, BackgroundTasks from fastapi import FastAPI, Request, HTTPException
from .constructors import ValidHookSongRequest, ValidAISongRequest from fastapi.responses import JSONResponse
class AI(FastAPI): class AI(FastAPI):
"""AI Endpoints""" """AI Endpoints"""
def __init__(self, app: FastAPI, my_util, constants): # pylint: disable=super-init-not-called def __init__(self, app: FastAPI,
self.app = app my_util, constants): # pylint: disable=super-init-not-called
self.app: FastAPI = app
self.util = my_util self.util = my_util
self.constants = constants self.constants = constants
self.url_clean_regex: Pattern = regex.compile(r'^\/ai\/(openai|base)\/') self.url_clean_regex: Pattern = regex.compile(r'^\/ai\/(openai|base)\/')
self.endpoints: dict = { self.endpoints: dict = {
"ai/openai": self.ai_openai_handler, "ai/openai": self.ai_openai_handler,
"ai/base": self.ai_handler, "ai/base": self.ai_handler,
"ai/song": self.ai_song_handler,
"ai/hook": self.ai_hook_handler,
#tbd #tbd
} }
for endpoint, handler in self.endpoints.items(): for endpoint, handler in self.endpoints.items():
app.add_api_route(f"/{endpoint}", handler, methods=["GET", "POST"], app.add_api_route(f"/{endpoint}", handler, methods=["GET", "POST"],
include_in_schema=False) include_in_schema=False)
async def respond_via_webhook(self, data: ValidHookSongRequest, originalRequest: Request) -> bool:
"""Respond via Webhook"""
try:
logging.debug("Request received: %s", data)
data2 = data.copy()
del data2.hook
if not data.hook:
return False
response = await self.ai_song_handler(data2, originalRequest)
if not response.get('resp'):
logging.critical("NO RESP!")
return False
response = response.get('resp')
hook_data = {
'username': 'Claude',
"embeds": [{
"title": "Claude's Feedback",
"description": response,
"footer": {
"text": "Current model: claude-3-haiku-20240307",
}
}]
}
async with ClientSession() as session:
async with await session.post(data.hook, json=hook_data,
timeout=ClientTimeout(connect=5, sock_read=5), headers={
'content-type': 'application/json; charset=utf-8',}) as request:
request.raise_for_status()
return True
except:
traceback.print_exc()
return False
async def ai_handler(self, request: Request): async def ai_handler(self, request: Request) -> JSONResponse:
""" """
/ai/base /ai/base
AI BASE Request AI BASE Request
@ -88,15 +49,15 @@ class AI(FastAPI):
headers=local_llm_headers, headers=local_llm_headers,
timeout=ClientTimeout(connect=15, sock_read=30)) as out_request: timeout=ClientTimeout(connect=15, sock_read=30)) as out_request:
response = await out_request.json() response = await out_request.json()
return response return JSONResponse(content=response)
except Exception as e: # pylint: disable=broad-exception-caught except Exception as e: # pylint: disable=broad-exception-caught
logging.error("Error: %s", e) logging.error("Error: %s", e)
return { return JSONResponse(status_code=500, content={
'err': True, 'err': True,
'errorText': 'General Failure' 'errorText': 'General Failure'
} })
async def ai_openai_handler(self, request: Request): async def ai_openai_handler(self, request: Request) -> JSONResponse:
""" """
/ai/openai /ai/openai
AI Request AI Request
@ -122,70 +83,10 @@ class AI(FastAPI):
headers=local_llm_headers, headers=local_llm_headers,
timeout=ClientTimeout(connect=15, sock_read=30)) as out_request: timeout=ClientTimeout(connect=15, sock_read=30)) as out_request:
response = await out_request.json() response = await out_request.json()
return response return JSONResponse(content=response)
except Exception as e: # pylint: disable=broad-exception-caught except Exception as e: # pylint: disable=broad-exception-caught
logging.error("Error: %s", e) logging.error("Error: %s", e)
return { return JSONResponse(status_code=500, content={
'err': True, 'err': True,
'errorText': 'General Failure' 'errorText': 'General Failure'
} })
async def ai_hook_handler(self, data: ValidHookSongRequest, request: Request, background_tasks: BackgroundTasks):
"""AI Hook Handler"""
background_tasks.add_task(self.respond_via_webhook, data, request)
return {
'success': True,
}
async def ai_song_handler(self, data: Union[ValidAISongRequest, ValidHookSongRequest], request: Request):
"""
/ai/song
AI (Song Info) Request [Public]
"""
ai_prompt = "You are a helpful assistant who will provide tidbits of info on songs the user may listen to."
ai_question = f"I am going to listen to the song \"{data.s}\" by \"{data.a}\"."
local_llm_headers = {
'x-api-key': self.constants.CLAUDE_API_KEY,
'anthropic-version': '2023-06-01',
'content-type': 'application/json',
}
request_data = {
'model': 'claude-3-haiku-20240307',
'max_tokens': 512,
'temperature': 0.6,
'system': ai_prompt,
'messages': [
{
"role": "user",
"content": ai_question.strip(),
}
]
}
try:
async with ClientSession() as session:
async with await session.post('https://api.anthropic.com/v1/messages',
json=request_data,
headers=local_llm_headers,
timeout=ClientTimeout(connect=15, sock_read=30)) as aiohttp_request:
response = await aiohttp_request.json()
logging.debug("Response: %s",
response)
if response.get('type') == 'error':
error_type = response.get('error').get('type')
error_message = response.get('error').get('message')
result = {
'resp': f"{error_type} error ({error_message})"
}
else:
result = {
'resp': response.get('content')[0].get('text').strip()
}
return result
except Exception as e: # pylint: disable=broad-exception-caught
logging.error("Error: %s", e)
return {
'err': True,
'errorText': 'General Failure'
}

View File

@ -6,30 +6,6 @@ from pydantic import BaseModel
# Constructors # Constructors
# TODO: REORDER # TODO: REORDER
"""
AI
"""
class ValidAISongRequest(BaseModel):
"""
- **a**: artist
- **s**: track title
"""
a: str
s: str
class ValidHookSongRequest(BaseModel):
"""
- **a**: artist
- **s**: track title
- **hook**: hook to return
"""
a: str
s: str
hook: str | None = ""
""" """
Karma Karma
""" """
@ -58,7 +34,7 @@ class ValidTopKarmaRequest(BaseModel):
""" """
- **n**: Number of top results to return (default: 10) - **n**: Number of top results to return (default: 10)
""" """
n: int | None = 10 n: Optional[int] = 10
""" """
LastFM LastFM
@ -124,7 +100,7 @@ class RandMsgRequest(BaseModel):
- **short**: Short randmsg? - **short**: Short randmsg?
""" """
short: Optional[bool] = False short: Optional[bool]
""" """
YT YT
@ -152,7 +128,7 @@ class ValidXCRequest(BaseModel):
key: str key: str
bid: int bid: int
cmd: str cmd: str
data: dict | None = None data: Optional[dict]
""" """
Transcriptions Transcriptions
@ -190,14 +166,14 @@ class ValidLyricRequest(BaseModel):
- **excluded_sources**: sources to exclude (new only) - **excluded_sources**: sources to exclude (new only)
""" """
a: str | None = None a: Optional[str] = None
s: str | None = None s: Optional[str] = None
t: str | None = None t: Optional[str] = None
sub: str | None = None sub: Optional[str] = None
extra: bool | None = False extra: Optional[bool] = False
lrc: bool | None = False lrc: Optional[bool] = False
src: str src: str
excluded_sources: list | None = None excluded_sources: Optional[list] = None
model_config = { model_config = {
"json_schema_extra": { "json_schema_extra": {
@ -218,7 +194,7 @@ class ValidTypeAheadRequest(BaseModel):
""" """
- **query**: query string - **query**: query string
""" """
pre_query: str|None = None pre_query: Optional[str] = None
query: str query: str
""" """
@ -237,10 +213,10 @@ class ValidRadioSongRequest(BaseModel):
- **alsoSkip**: Whether to skip immediately to this track [not implemented] - **alsoSkip**: Whether to skip immediately to this track [not implemented]
""" """
key: str key: str
artist: str | None = None artist: Optional[str] = None
song: str | None = None song: Optional[str] = None
artistsong: str | None = None artistsong: Optional[str] = None
alsoSkip: bool = False alsoSkip: Optional[bool] = False
class ValidRadioQueueGetRequest(BaseModel): class ValidRadioQueueGetRequest(BaseModel):
""" """
@ -248,8 +224,8 @@ class ValidRadioQueueGetRequest(BaseModel):
- **limit**: optional, default: 15k - **limit**: optional, default: 15k
""" """
key: str|None = None key: Optional[str] = None
limit: int|None = 15000 limit: Optional[int] = 15_000
class ValidRadioNextRequest(BaseModel): class ValidRadioNextRequest(BaseModel):
""" """
@ -257,7 +233,7 @@ class ValidRadioNextRequest(BaseModel):
- **skipTo**: UUID to skip to [optional] - **skipTo**: UUID to skip to [optional]
""" """
key: str key: str
skipTo: str|None = None skipTo: Optional[str] = None
class ValidRadioReshuffleRequest(ValidRadioNextRequest): class ValidRadioReshuffleRequest(ValidRadioNextRequest):
""" """
@ -272,7 +248,7 @@ class ValidRadioQueueShiftRequest(BaseModel):
""" """
key: str key: str
uuid: str uuid: str
next: bool = False next: Optional[bool] = False
class ValidRadioQueueRemovalRequest(BaseModel): class ValidRadioQueueRemovalRequest(BaseModel):
""" """

View File

@ -7,8 +7,9 @@ import time
import datetime import datetime
import traceback import traceback
import aiosqlite as sqlite3 import aiosqlite as sqlite3
from typing import LiteralString, Optional from typing import LiteralString, Optional, Union
from fastapi import FastAPI, Request, HTTPException from fastapi import FastAPI, Request, HTTPException
from fastapi.responses import JSONResponse
from .constructors import ValidTopKarmaRequest, ValidKarmaRetrievalRequest,\ from .constructors import ValidTopKarmaRequest, ValidKarmaRetrievalRequest,\
ValidKarmaUpdateRequest ValidKarmaUpdateRequest
@ -18,12 +19,12 @@ class KarmaDB:
self.db_path: LiteralString = os.path.join("/", "usr", "local", "share", self.db_path: LiteralString = os.path.join("/", "usr", "local", "share",
"sqlite_dbs", "karma.db") "sqlite_dbs", "karma.db")
async def get_karma(self, keyword: str) -> int | dict: async def get_karma(self, keyword: str) -> Union[int, dict]:
"""Get Karma Value for Keyword """Get Karma Value for Keyword
Args: Args:
keyword (str): The keyword to search keyword (str): The keyword to search
Returns: Returns:
int|dict Union[int, dict]
""" """
async with sqlite3.connect(self.db_path, timeout=2) as db_conn: async with sqlite3.connect(self.db_path, timeout=2) as db_conn:
async with await db_conn.execute("SELECT score FROM karma WHERE keyword LIKE ? LIMIT 1", (keyword,)) as db_cursor: async with await db_conn.execute("SELECT score FROM karma WHERE keyword LIKE ? LIMIT 1", (keyword,)) as db_cursor:
@ -37,7 +38,8 @@ class KarmaDB:
} }
async def get_top(self, n: Optional[int] = 10) -> Optional[list[tuple]]: async def get_top(self, n: Optional[int] = 10) -> Optional[list[tuple]]:
"""Get Top n=10 Karma Entries """
Get Top n=10 Karma Entries
Args: Args:
n (Optional[int]) = 10: The number of top results to return n (Optional[int]) = 10: The number of top results to return
Returns: Returns:
@ -51,8 +53,10 @@ class KarmaDB:
traceback.print_exc() traceback.print_exc()
return None return None
async def update_karma(self, granter: str, keyword: str, flag: int) -> Optional[bool]: async def update_karma(self, granter: str, keyword: str,
"""Update Karma for Keyword flag: int) -> Optional[bool]:
"""
Update Karma for Keyword
Args: Args:
granter (str): The user who granted (increased/decreased) the karma granter (str): The user who granted (increased/decreased) the karma
keyword (str): The keyword to update keyword (str): The keyword to update
@ -93,9 +97,11 @@ class KarmaDB:
return False return False
class Karma(FastAPI): class Karma(FastAPI):
"""Karma Endpoints""" """
def __init__(self, app: FastAPI, util, constants): # pylint: disable=super-init-not-called Karma Endpoints
self.app = app """
def __init__(self, app: FastAPI, util, constants) -> None: # pylint: disable=super-init-not-called
self.app: FastAPI = app
self.util = util self.util = util
self.constants = constants self.constants = constants
self.db = KarmaDB() self.db = KarmaDB()
@ -111,8 +117,11 @@ class Karma(FastAPI):
include_in_schema=False) include_in_schema=False)
async def top_karma_handler(self, request: Request, data: ValidTopKarmaRequest | None = None) -> list[tuple]|dict: async def top_karma_handler(self, request: Request,
"""Get top keywords for karma""" data: Optional[ValidTopKarmaRequest] = None) -> JSONResponse:
"""
Get top keywords for karma
"""
if not self.util.check_key(request.url.path, request.headers.get('X-Authd-With')): if not self.util.check_key(request.url.path, request.headers.get('X-Authd-With')):
raise HTTPException(status_code=403, detail="Unauthorized") raise HTTPException(status_code=403, detail="Unauthorized")
@ -125,19 +134,20 @@ class Karma(FastAPI):
try: try:
top10: Optional[list[tuple]] = await self.db.get_top(n=n) top10: Optional[list[tuple]] = await self.db.get_top(n=n)
if not top10: if not top10:
return { return JSONResponse(status_code=500, content={
'err': True, 'err': True,
'errorText': 'General failure', 'errorText': 'General failure',
} })
return top10 return JSONResponse(content=top10)
except: except:
traceback.print_exc() traceback.print_exc()
return { return JSONResponse(status_code=500, content={
'err': True, 'err': True,
'errorText': 'Exception occurred.', 'errorText': 'Exception occurred.',
} })
async def get_karma_handler(self, data: ValidKarmaRetrievalRequest, request: Request): async def get_karma_handler(self, data: ValidKarmaRetrievalRequest,
request: Request) -> JSONResponse:
"""Get current karma value""" """Get current karma value"""
if not self.util.check_key(request.url.path, request.headers.get('X-Authd-With')): if not self.util.check_key(request.url.path, request.headers.get('X-Authd-With')):
@ -145,30 +155,32 @@ class Karma(FastAPI):
keyword: str = data.keyword keyword: str = data.keyword
try: try:
count: int|dict = await self.db.get_karma(keyword) count: Union[int, dict] = await self.db.get_karma(keyword)
return { return JSONResponse(content={
'keyword': keyword, 'keyword': keyword,
'count': count, 'count': count,
} })
except: except:
traceback.print_exc() traceback.print_exc()
return { return JSONResponse(status_code=500, content={
'err': True, 'err': True,
'errorText': "Exception occurred." 'errorText': "Exception occurred.",
} })
async def modify_karma_handler(self, data: ValidKarmaUpdateRequest, request: Request) -> dict: async def modify_karma_handler(self, data: ValidKarmaUpdateRequest,
request: Request) -> JSONResponse:
"""Update karma count""" """Update karma count"""
if not self.util.check_key(request.url.path, request.headers.get('X-Authd-With'), 2): if not self.util.check_key(request.url.path, request.headers.get('X-Authd-With'), 2):
raise HTTPException(status_code=403, detail="Unauthorized") raise HTTPException(status_code=403, detail="Unauthorized")
if not data.flag in [0, 1]: if not data.flag in [0, 1]:
return { return JSONResponse(status_code=500, content={
'err': True, 'err': True,
'errorText': 'Invalid request' 'errorText': 'Invalid request',
} })
return { return JSONResponse(content={
'success': await self.db.update_karma(data.granter, data.keyword, data.flag) 'success': await self.db.update_karma(data.granter,
} data.keyword, data.flag)
})

View File

@ -3,15 +3,17 @@
import importlib import importlib
import traceback import traceback
from typing import Optional from typing import Optional, Union
from fastapi import FastAPI from fastapi import FastAPI
from fastapi.responses import JSONResponse
from .constructors import ValidArtistSearchRequest, ValidAlbumDetailRequest,\ from .constructors import ValidArtistSearchRequest, ValidAlbumDetailRequest,\
ValidTrackInfoRequest, LastFMException ValidTrackInfoRequest, LastFMException
class LastFM(FastAPI): class LastFM(FastAPI):
"""Last.FM Endpoints""" """Last.FM Endpoints"""
def __init__(self, app: FastAPI, util, constants) -> None: # pylint: disable=super-init-not-called def __init__(self, app: FastAPI,
self.app = app util, constants) -> None: # pylint: disable=super-init-not-called
self.app: FastAPI = app
self.util = util self.util = util
self.constants = constants self.constants = constants
self.lastfm = importlib.import_module("lastfm_wrapper").LastFM() self.lastfm = importlib.import_module("lastfm_wrapper").LastFM()
@ -29,43 +31,43 @@ class LastFM(FastAPI):
app.add_api_route(f"/{endpoint}", handler, methods=["POST"], app.add_api_route(f"/{endpoint}", handler, methods=["POST"],
include_in_schema=True) include_in_schema=True)
async def artist_by_name_handler(self, data: ValidArtistSearchRequest): async def artist_by_name_handler(self, data: ValidArtistSearchRequest) -> JSONResponse:
""" """
Get artist info Get artist info
- **a**: Artist to search - **a**: Artist to search
""" """
artist: Optional[str] = data.a.strip() artist: Optional[str] = data.a.strip()
if not artist: if not artist:
return { return JSONResponse(content={
'err': True, 'err': True,
'errorText': 'No artist specified' 'errorText': 'No artist specified',
} })
artist_result = await self.lastfm.search_artist(artist=artist) artist_result = await self.lastfm.search_artist(artist=artist)
if not artist_result or "err" in artist_result.keys(): if not artist_result or "err" in artist_result.keys():
return { return JSONResponse(status_code=500, content={
'err': True, 'err': True,
'errorText': 'Search failed (no results?)' 'errorText': 'Search failed (no results?)',
} })
return { return JSONResponse(content={
'success': True, 'success': True,
'result': artist_result 'result': artist_result,
} })
async def artist_album_handler(self, data: ValidArtistSearchRequest) -> dict: async def artist_album_handler(self, data: ValidArtistSearchRequest) -> JSONResponse:
""" """
Get artist's albums/releases Get artist's albums/releases
- **a**: Artist to search - **a**: Artist to search
""" """
artist: str = data.a.strip() artist: str = data.a.strip()
if not artist: if not artist:
return { return JSONResponse(status_code=500, content={
'err': True, 'err': True,
'errorText': 'No artist specified' 'errorText': 'Invalid request: No artist specified',
} })
album_result: dict|list[dict] = await self.lastfm.get_artist_albums(artist=artist) album_result: Union[dict, list[dict]] = await self.lastfm.get_artist_albums(artist=artist)
album_result_out: list = [] album_result_out: list = []
seen_release_titles: list = [] seen_release_titles: list = []
@ -76,12 +78,12 @@ class LastFM(FastAPI):
seen_release_titles.append(release_title.lower()) seen_release_titles.append(release_title.lower())
album_result_out.append(release) album_result_out.append(release)
return { return JSONResponse(content={
'success': True, 'success': True,
'result': album_result_out 'result': album_result_out
} })
async def release_detail_handler(self, data: ValidAlbumDetailRequest) -> dict: async def release_detail_handler(self, data: ValidAlbumDetailRequest) -> JSONResponse:
""" """
Get details of a particular release by an artist Get details of a particular release by an artist
- **a**: Artist to search - **a**: Artist to search
@ -91,10 +93,10 @@ class LastFM(FastAPI):
release: str = data.release.strip() release: str = data.release.strip()
if not artist or not release: if not artist or not release:
return { return JSONResponse(status_code=500, content={
'err': True, 'err': True,
'errorText': 'Invalid request' 'errorText': 'Invalid request',
} })
release_result = await self.lastfm.get_release(artist=artist, album=release) release_result = await self.lastfm.get_release(artist=artist, album=release)
ret_obj = { ret_obj = {
@ -102,15 +104,15 @@ class LastFM(FastAPI):
'artists': release_result.get('artists'), 'artists': release_result.get('artists'),
'title': release_result.get('title'), 'title': release_result.get('title'),
'summary': release_result.get('summary'), 'summary': release_result.get('summary'),
'tracks': release_result.get('tracks') 'tracks': release_result.get('tracks'),
} }
return { return JSONResponse(content={
'success': True, 'success': True,
'result': ret_obj 'result': ret_obj,
} })
async def release_tracklist_handler(self, data: ValidAlbumDetailRequest) -> dict: async def release_tracklist_handler(self, data: ValidAlbumDetailRequest) -> JSONResponse:
""" """
Get track list for a particular release by an artist Get track list for a particular release by an artist
- **a**: Artist to search - **a**: Artist to search
@ -120,22 +122,22 @@ class LastFM(FastAPI):
release: str = data.release.strip() release: str = data.release.strip()
if not artist or not release: if not artist or not release:
return { return JSONResponse(status_code=500, content={
'err': True, 'err': True,
'errorText': 'Invalid request' 'errorText': 'Invalid request',
} })
tracklist_result: dict = await self.lastfm.get_album_tracklist(artist=artist, album=release) tracklist_result: dict = await self.lastfm.get_album_tracklist(artist=artist, album=release)
return { return JSONResponse(content={
'success': True, 'success': True,
'id': tracklist_result.get('id'), 'id': tracklist_result.get('id'),
'artists': tracklist_result.get('artists'), 'artists': tracklist_result.get('artists'),
'title': tracklist_result.get('title'), 'title': tracklist_result.get('title'),
'summary': tracklist_result.get('summary'), 'summary': tracklist_result.get('summary'),
'tracks': tracklist_result.get('tracks') 'tracks': tracklist_result.get('tracks'),
} })
async def track_info_handler(self, data: ValidTrackInfoRequest) -> dict: async def track_info_handler(self, data: ValidTrackInfoRequest) -> JSONResponse:
""" """
Get track info from Last.FM given an artist/track Get track info from Last.FM given an artist/track
- **a**: Artist to search - **a**: Artist to search
@ -146,22 +148,23 @@ class LastFM(FastAPI):
track: str = data.t track: str = data.t
if not artist or not track: if not artist or not track:
return { return JSONResponse(status_code=500, content={
'err': True, 'err': True,
'errorText': 'Invalid request' 'errorText': 'Invalid request'
} })
track_info_result: dict = await self.lastfm.get_track_info(artist=artist, track=track) track_info_result: dict = await self.lastfm.get_track_info(artist=artist,
track=track)
if "err" in track_info_result: if "err" in track_info_result:
raise LastFMException("Unknown error occurred: %s", raise LastFMException("Unknown error occurred: %s",
track_info_result.get('errorText', '??')) track_info_result.get('errorText', '??'))
return { return JSONResponse(content={
'success': True, 'success': True,
'result': track_info_result 'result': track_info_result
} })
except: except:
traceback.print_exc() traceback.print_exc()
return { return JSONResponse(status_code=500, content={
'err': True, 'err': True,
'errorText': 'General error', 'errorText': 'General error',
} })

View File

@ -7,7 +7,8 @@ import urllib.parse
import regex import regex
import aiosqlite as sqlite3 import aiosqlite as sqlite3
from fastapi import FastAPI, HTTPException from fastapi import FastAPI, HTTPException
from typing import LiteralString, Optional, Callable from fastapi.responses import JSONResponse
from typing import LiteralString, Optional, Union
from regex import Pattern from regex import Pattern
from .constructors import ValidTypeAheadRequest, ValidLyricRequest from .constructors import ValidTypeAheadRequest, ValidLyricRequest
from lyric_search.constructors import LyricsResult from lyric_search.constructors import LyricsResult
@ -15,13 +16,18 @@ from lyric_search.sources import aggregate
from lyric_search import notifier from lyric_search import notifier
class CacheUtils: class CacheUtils:
"""Lyrics Cache DB Utils""" """
Lyrics Cache DB Utils
"""
def __init__(self) -> None: def __init__(self) -> None:
self.lyrics_db_path: LiteralString = os.path.join("/usr/local/share", self.lyrics_db_path: LiteralString = os.path.join("/usr/local/share",
"sqlite_dbs", "cached_lyrics.db") "sqlite_dbs", "cached_lyrics.db")
async def check_typeahead(self, s: str, pre_query: str | None = None) -> list[dict]: async def check_typeahead(self, s: str,
"""Check s against artists stored - for typeahead""" pre_query: Optional[str] = None) -> list[dict]:
"""
Check s against artists stored - for typeahead
"""
async with sqlite3.connect(self.lyrics_db_path, async with sqlite3.connect(self.lyrics_db_path,
timeout=2) as db_conn: timeout=2) as db_conn:
db_conn.row_factory = sqlite3.Row db_conn.row_factory = sqlite3.Row
@ -36,9 +42,12 @@ class CacheUtils:
class LyricSearch(FastAPI): class LyricSearch(FastAPI):
"""Lyric Search Endpoint""" """
def __init__(self, app: FastAPI, util, constants): # pylint: disable=super-init-not-called Lyric Search Endpoint
self.app = app """
def __init__(self, app: FastAPI,
util, constants) -> None: # pylint: disable=super-init-not-called
self.app: FastAPI = app
self.util = util self.util = util
self.constants = constants self.constants = constants
self.cache_utils = CacheUtils() self.cache_utils = CacheUtils()
@ -70,36 +79,39 @@ class LyricSearch(FastAPI):
_schema_include = endpoint in ["lyric/search"] _schema_include = endpoint in ["lyric/search"]
app.add_api_route(f"/{endpoint}", handler, methods=["POST"], include_in_schema=_schema_include) app.add_api_route(f"/{endpoint}", handler, methods=["POST"], include_in_schema=_schema_include)
async def artist_typeahead_handler(self, data: ValidTypeAheadRequest) -> list[str]|dict: async def artist_typeahead_handler(self, data: ValidTypeAheadRequest) -> JSONResponse:
"""Artist Type Ahead Handler""" """
Artist Type Ahead Handler
"""
if not isinstance(data.query, str) or len(data.query) < 2: if not isinstance(data.query, str) or len(data.query) < 2:
return { return JSONResponse(status_code=500, content={
'err': True, 'err': True,
'errorText': 'Invalid request', 'errorText': 'Invalid request',
} })
query: str = data.query query: str = data.query
typeahead_result: list[dict] = await self.cache_utils.check_typeahead(query) typeahead_result: list[dict] = await self.cache_utils.check_typeahead(query)
typeahead_list: list[str] = [str(r.get('artist')) for r in typeahead_result] typeahead_list: list[str] = [str(r.get('artist')) for r in typeahead_result]
return typeahead_list return JSONResponse(content=typeahead_list)
async def song_typeahead_handler(self, data: ValidTypeAheadRequest) -> list[str]|dict: async def song_typeahead_handler(self, data: ValidTypeAheadRequest) -> JSONResponse:
"""Song Type Ahead Handler""" """
Song Type Ahead Handler
"""
if not isinstance(data.pre_query, str)\ if not isinstance(data.pre_query, str)\
or not isinstance(data.query, str|None): or not isinstance(data.query, str):
return { return JSONResponse(status_code=500, content={
'err': True, 'err': True,
'errorText': 'Invalid request', 'errorText': 'Invalid request',
} })
pre_query: str = data.pre_query pre_query: str = data.pre_query
query: str = data.query query: str = data.query
typeahead_result: list[dict] = await self.cache_utils.check_typeahead(query, pre_query) typeahead_result: list[dict] = await self.cache_utils.check_typeahead(query, pre_query)
typeahead_list: list[str] = [str(r.get('song')) for r in typeahead_result] typeahead_list: list[str] = [str(r.get('song')) for r in typeahead_result]
return typeahead_list return JSONResponse(content=typeahead_list)
async def lyric_search_handler(self, data: ValidLyricRequest) -> dict: async def lyric_search_handler(self, data: ValidLyricRequest) -> JSONResponse:
""" """
Search for lyrics Search for lyrics
- **a**: artist - **a**: artist
- **s**: song - **s**: song
- **t**: track (artist and song combined) [used only if a & s are not used] - **t**: track (artist and song combined) [used only if a & s are not used]
@ -109,26 +121,23 @@ class LyricSearch(FastAPI):
- **src**: the script/utility which initiated the request - **src**: the script/utility which initiated the request
- **excluded_sources**: sources to exclude [optional, default: none] - **excluded_sources**: sources to exclude [optional, default: none]
""" """
if (not data.a or not data.s) and not data.t or not data.src: if (not data.a or not data.s) and not data.t or not data.src:
raise HTTPException(detail="Invalid request", status_code=500) raise HTTPException(detail="Invalid request", status_code=500)
if data.src.upper() not in self.acceptable_request_sources: if data.src.upper() not in self.acceptable_request_sources:
await self.notifier.send(f"ERROR @ {__file__.rsplit("/", maxsplit=1)[-1]}", await self.notifier.send(f"ERROR @ {__file__.rsplit("/", maxsplit=1)[-1]}",
f"Unknown request source: {data.src}") f"Unknown request source: {data.src}")
return { return JSONResponse(status_code=500, content={
'err': True, 'err': True,
'errorText': f'Unknown request source: {data.src}', 'errorText': f'Unknown request source: {data.src}',
} })
if not data.t: if not data.t:
search_artist: Optional[str] = data.a search_artist: Optional[str] = data.a
search_song: Optional[str] = data.s search_song: Optional[str] = data.s
else: else:
t_split = data.t.split(" - ", maxsplit=1) t_split: tuple = tuple(data.t.split(" - ", maxsplit=1))
search_artist = t_split[0] (search_artist, search_song) = t_split
search_song = t_split[1]
if search_artist and search_song: if search_artist and search_song:
search_artist = str(self.constants.DOUBLE_SPACE_REGEX.sub(" ", search_artist.strip())) search_artist = str(self.constants.DOUBLE_SPACE_REGEX.sub(" ", search_artist.strip()))
@ -137,21 +146,21 @@ class LyricSearch(FastAPI):
search_song = urllib.parse.unquote(search_song) search_song = urllib.parse.unquote(search_song)
if not isinstance(search_artist, str) or not isinstance(search_song, str): if not isinstance(search_artist, str) or not isinstance(search_song, str):
return { return JSONResponse(status_code=500, content={
'err': True, 'err': True,
'errorText': 'Invalid request', 'errorText': 'Invalid request',
} })
excluded_sources: Optional[list] = data.excluded_sources excluded_sources: Optional[list] = data.excluded_sources
aggregate_search = aggregate.Aggregate(exclude_methods=excluded_sources) aggregate_search = aggregate.Aggregate(exclude_methods=excluded_sources)
plain_lyrics: bool = not data.lrc plain_lyrics: bool = not data.lrc
result: Optional[LyricsResult|dict] = await aggregate_search.search(search_artist, search_song, plain_lyrics) result: Optional[Union[LyricsResult, dict]] = await aggregate_search.search(search_artist, search_song, plain_lyrics)
if not result: if not result:
return { return JSONResponse(content={
'err': True, 'err': True,
'errorText': 'Sources exhausted, lyrics not located.', 'errorText': 'Sources exhausted, lyrics not located.',
} })
result = vars(result) result = vars(result)
@ -167,9 +176,11 @@ class LyricSearch(FastAPI):
break break
if not seeked_found_line: if not seeked_found_line:
return { return JSONResponse(status_code=500, content={
'err': True,
'errorText': 'Seek (a.k.a. subsearch) failed.',
'failed_seek': True, 'failed_seek': True,
} })
result['lyrics'] = " / ".join(lyric_lines[seeked_found_line:]) result['lyrics'] = " / ".join(lyric_lines[seeked_found_line:])
result['confidence'] = int(result.get('confidence', 0)) result['confidence'] = int(result.get('confidence', 0))
@ -188,4 +199,4 @@ class LyricSearch(FastAPI):
if not data.extra: if not data.extra:
result.pop('src') result.pop('src')
return result return JSONResponse(content=result)

View File

@ -2,18 +2,20 @@
# pylint: disable=bare-except, broad-exception-caught, invalid-name # pylint: disable=bare-except, broad-exception-caught, invalid-name
import time import time
import logging
from typing import Optional from typing import Optional
from fastapi import FastAPI from fastapi import FastAPI
from fastapi.responses import JSONResponse
import redis.asyncio as redis import redis.asyncio as redis
from lyric_search.sources import private, cache as LyricsCache, redis_cache from lyric_search.sources import private, cache as LyricsCache, redis_cache
class Misc(FastAPI): class Misc(FastAPI):
"""Misc Endpoints""" """Misc Endpoints"""
def __init__(self, app: FastAPI, my_util, constants, radio): # pylint: disable=super-init-not-called def __init__(self, app: FastAPI, my_util,
self.app = app constants, radio) -> None: # pylint: disable=super-init-not-called
self.app: FastAPI = app
self.util = my_util self.util = my_util
self.constants = constants self.constants = constants
self.radio_pubkey: str = "XC-AJCJS89-AOLOFKZ92921AK-AKASKZJAN178-3D1"
self.lyr_cache = LyricsCache.Cache() self.lyr_cache = LyricsCache.Cache()
self.redis_cache = redis_cache.RedisCache() self.redis_cache = redis_cache.RedisCache()
self.redis_client = redis.Redis(password=private.REDIS_PW) self.redis_client = redis.Redis(password=private.REDIS_PW)
@ -44,7 +46,7 @@ class Misc(FastAPI):
return artistsong return artistsong
async def homepage_redis_widget(self) -> dict: async def homepage_redis_widget(self) -> JSONResponse:
"""Homepage Redis Widget Handler""" """Homepage Redis Widget Handler"""
# Measure response time w/ test lyric search # Measure response time w/ test lyric search
@ -58,33 +60,44 @@ class Misc(FastAPI):
num_ci_keys = len(ci_keys) num_ci_keys = len(ci_keys)
index_info = await self.redis_client.ft().info() index_info = await self.redis_client.ft().info()
indexed_lyrics: int = index_info.get('num_docs') indexed_lyrics: int = index_info.get('num_docs')
return { return JSONResponse(content={
'responseTime': round(response_time, 7), 'responseTime': round(response_time, 7),
'storedKeys': total_keys, 'storedKeys': total_keys,
'indexedLyrics': indexed_lyrics, 'indexedLyrics': indexed_lyrics,
'sessions': num_ci_keys, 'sessions': num_ci_keys,
} })
async def homepage_sqlite_widget(self) -> dict: async def homepage_sqlite_widget(self) -> JSONResponse:
"""Homepage SQLite Widget Handler""" """Homepage SQLite Widget Handler"""
row_count: int = await self.lyr_cache.sqlite_rowcount() row_count: int = await self.lyr_cache.sqlite_rowcount()
distinct_artists: int = await self.lyr_cache.sqlite_distinct("artist") distinct_artists: int = await self.lyr_cache.sqlite_distinct("artist")
lyrics_length: int = await self.lyr_cache.sqlite_lyrics_length() lyrics_length: int = await self.lyr_cache.sqlite_lyrics_length()
return { return JSONResponse(content={
'storedRows': row_count, 'storedRows': row_count,
'distinctArtists': distinct_artists, 'distinctArtists': distinct_artists,
'lyricsLength': lyrics_length, 'lyricsLength': lyrics_length,
} })
async def homepage_lyrics_widget(self) -> dict: async def homepage_lyrics_widget(self) -> dict:
"""Homepage Lyrics Widget Handler""" """Homepage Lyrics Widget Handler"""
found_counts: dict = await self.redis_cache.get_found_counts()
return await self.redis_cache.get_found_counts() if not isinstance(found_counts, dict):
return {
'err': True,
'errorText': 'General failure.',
}
logging.info("Found: %s", found_counts)
return found_counts
async def homepage_radio_widget(self) -> dict: async def homepage_radio_widget(self) -> JSONResponse:
"""Homepage Radio Widget Handler""" """Homepage Radio Widget Handler"""
radio_np: str = await self.get_radio_np()
return { if not radio_np:
return JSONResponse(status_code=500, content={
'err': True,
'errorText': 'General failure.',
})
return JSONResponse(content={
'now_playing': await self.get_radio_np(), 'now_playing': await self.get_radio_np(),
} })

View File

@ -2,22 +2,16 @@
import logging import logging
import traceback import traceback
import os
import aiosqlite as sqlite3
import time import time
import random import random
import asyncio import asyncio
import regex
import music_tag
from . import radio_util from . import radio_util
from .constructors import ValidRadioNextRequest, ValidRadioReshuffleRequest, ValidRadioQueueShiftRequest,\ from .constructors import ValidRadioNextRequest, ValidRadioReshuffleRequest, ValidRadioQueueShiftRequest,\
ValidRadioQueueRemovalRequest, ValidRadioSongRequest,\ ValidRadioQueueRemovalRequest, ValidRadioSongRequest, RadioException
ValidRadioQueueGetRequest, RadioException
from uuid import uuid4 as uuid from uuid import uuid4 as uuid
from typing import Optional, LiteralString from typing import Optional
from fastapi import FastAPI, BackgroundTasks, Request, Response, HTTPException from fastapi import FastAPI, BackgroundTasks, Request, Response, HTTPException
from fastapi.responses import RedirectResponse from fastapi.responses import RedirectResponse, JSONResponse
from aiohttp import ClientSession, ClientTimeout
# pylint: disable=bare-except, broad-exception-caught, invalid-name # pylint: disable=bare-except, broad-exception-caught, invalid-name
@ -28,8 +22,9 @@ TODO:
class Radio(FastAPI): class Radio(FastAPI):
"""Radio Endpoints""" """Radio Endpoints"""
def __init__(self, app: FastAPI, my_util, constants) -> None: # pylint: disable=super-init-not-called def __init__(self, app: FastAPI,
self.app = app my_util, constants) -> None: # pylint: disable=super-init-not-called
self.app: FastAPI = app
self.util = my_util self.util = my_util
self.constants = constants self.constants = constants
self.radio_util = radio_util.RadioUtil(self.constants) self.radio_util = radio_util.RadioUtil(self.constants)
@ -55,7 +50,8 @@ class Radio(FastAPI):
asyncio.get_event_loop().run_until_complete(self.radio_util.load_playlist()) asyncio.get_event_loop().run_until_complete(self.radio_util.load_playlist())
asyncio.get_event_loop().run_until_complete(self.radio_util._ls_skip()) asyncio.get_event_loop().run_until_complete(self.radio_util._ls_skip())
async def radio_skip(self, data: ValidRadioNextRequest, request: Request) -> bool: async def radio_skip(self, data: ValidRadioNextRequest,
request: Request) -> JSONResponse:
""" """
Skip to the next track in the queue, or to uuid specified in skipTo if provided Skip to the next track in the queue, or to uuid specified in skipTo if provided
""" """
@ -65,17 +61,28 @@ class Radio(FastAPI):
if data.skipTo: if data.skipTo:
queue_item = self.radio_util.get_queue_item_by_uuid(data.skipTo) queue_item = self.radio_util.get_queue_item_by_uuid(data.skipTo)
if not queue_item: if not queue_item:
return False return JSONResponse(status_code=500, content={
'err': True,
'errorText': 'No such queue item.',
})
self.radio_util.active_playlist = self.radio_util.active_playlist[queue_item[0]:] self.radio_util.active_playlist = self.radio_util.active_playlist[queue_item[0]:]
if not self.radio_util.active_playlist: if not self.radio_util.active_playlist:
await self.radio_util.load_playlist() await self.radio_util.load_playlist()
return await self.radio_util._ls_skip() skip_result: bool = await self.radio_util._ls_skip()
status_code = 200 if skip_result else 500
return JSONResponse(status_code=status_code, content={
'success': skip_result,
})
except Exception as e: except Exception as e:
traceback.print_exc() traceback.print_exc()
return False return JSONResponse(status_code=500, content={
'err': True,
'errorText': 'General failure.',
})
async def radio_reshuffle(self, data: ValidRadioReshuffleRequest, request: Request) -> dict: async def radio_reshuffle(self, data: ValidRadioReshuffleRequest,
request: Request) -> JSONResponse:
""" """
Reshuffle the play queue Reshuffle the play queue
""" """
@ -83,16 +90,16 @@ class Radio(FastAPI):
raise HTTPException(status_code=403, detail="Unauthorized") raise HTTPException(status_code=403, detail="Unauthorized")
random.shuffle(self.radio_util.active_playlist) random.shuffle(self.radio_util.active_playlist)
return { return JSONResponse(content={
'ok': True 'ok': True
} })
async def radio_get_queue(self, request: Request, limit: Optional[int] = 15_000) -> dict: async def radio_get_queue(self, request: Request,
limit: Optional[int] = 15_000) -> JSONResponse:
""" """
Get current play queue, up to limit [default: 15k] Get current play queue, up to limit [default: 15k]
""" """
queue_out: list[dict] = [] queue_out: list[dict] = []
for x, item in enumerate(self.radio_util.active_playlist[0:limit]): for x, item in enumerate(self.radio_util.active_playlist[0:limit]):
queue_out.append({ queue_out.append({
@ -104,45 +111,52 @@ class Radio(FastAPI):
'artistsong': item.get('artistsong'), 'artistsong': item.get('artistsong'),
'duration': item.get('duration'), 'duration': item.get('duration'),
}) })
return { return JSONResponse(content={
'items': queue_out 'items': queue_out
} })
async def radio_queue_shift(self, data: ValidRadioQueueShiftRequest, request: Request) -> dict: async def radio_queue_shift(self, data: ValidRadioQueueShiftRequest,
"""Shift position of a UUID within the queue [currently limited to playing next or immediately]""" request: Request) -> JSONResponse:
"""
Shift position of a UUID within the queue
[currently limited to playing next or immediately]
"""
if not self.util.check_key(path=request.url.path, req_type=4, key=data.key): if not self.util.check_key(path=request.url.path, req_type=4, key=data.key):
raise HTTPException(status_code=403, detail="Unauthorized") raise HTTPException(status_code=403, detail="Unauthorized")
queue_item = self.radio_util.get_queue_item_by_uuid(data.uuid) queue_item = self.radio_util.get_queue_item_by_uuid(data.uuid)
if not queue_item: if not queue_item:
return { return JSONResponse(status_code=500, content={
'err': True, 'err': True,
'errorText': 'Queue item not found.', 'errorText': 'Queue item not found.',
} })
(x, item) = queue_item (x, item) = queue_item
self.radio_util.active_playlist.pop(x) self.radio_util.active_playlist.pop(x)
self.radio_util.active_playlist.insert(0, item) self.radio_util.active_playlist.insert(0, item)
if not data.next: if not data.next:
await self.radio_util._ls_skip() await self.radio_util._ls_skip()
return { return JSONResponse(content={
'ok': True, 'ok': True,
} })
async def radio_queue_remove(self, data: ValidRadioQueueRemovalRequest, request: Request) -> dict: async def radio_queue_remove(self, data: ValidRadioQueueRemovalRequest,
"""Remove an item from the current play queue""" request: Request) -> JSONResponse:
"""
Remove an item from the current play queue
"""
if not self.util.check_key(path=request.url.path, req_type=4, key=data.key): if not self.util.check_key(path=request.url.path, req_type=4, key=data.key):
raise HTTPException(status_code=403, detail="Unauthorized") raise HTTPException(status_code=403, detail="Unauthorized")
queue_item = self.radio_util.get_queue_item_by_uuid(data.uuid) queue_item = self.radio_util.get_queue_item_by_uuid(data.uuid)
if not queue_item: if not queue_item:
return { return JSONResponse(status_code=500, content={
'err': True, 'err': True,
'errorText': 'Queue item not found.', 'errorText': 'Queue item not found.',
} })
self.radio_util.active_playlist.pop(queue_item[0]) self.radio_util.active_playlist.pop(queue_item[0])
return { return JSONResponse(content={
'ok': True, 'ok': True,
} })
async def album_art_handler(self, request: Request, track_id: Optional[int] = None) -> Response: async def album_art_handler(self, request: Request, track_id: Optional[int] = None) -> Response:
""" """
@ -164,22 +178,22 @@ class Radio(FastAPI):
return RedirectResponse(url="https://codey.lol/images/radio_art_default.jpg", return RedirectResponse(url="https://codey.lol/images/radio_art_default.jpg",
status_code=302) status_code=302)
async def radio_now_playing(self, request: Request) -> dict: async def radio_now_playing(self, request: Request) -> JSONResponse:
"""Get currently playing track info""" """
Get currently playing track info
"""
ret_obj: dict = {**self.radio_util.now_playing} ret_obj: dict = {**self.radio_util.now_playing}
cur_elapsed: int = self.radio_util.now_playing.get('elapsed', -1)
cur_duration: int = self.radio_util.now_playing.get('duration', 999999)
try: try:
ret_obj['elapsed'] = int(time.time()) - ret_obj['start'] ret_obj['elapsed'] = int(time.time()) - ret_obj['start']
except KeyError: except KeyError:
traceback.print_exc() traceback.print_exc()
ret_obj['elapsed'] = 0 ret_obj['elapsed'] = 0
ret_obj.pop('file_path') ret_obj.pop('file_path')
return ret_obj return JSONResponse(content=ret_obj)
async def radio_get_next(self, data: ValidRadioNextRequest, request: Request, async def radio_get_next(self, data: ValidRadioNextRequest, request: Request,
background_tasks: BackgroundTasks) -> Optional[dict]: background_tasks: BackgroundTasks) -> JSONResponse:
""" """
Get next track Get next track
Track will be removed from the queue in the process. Track will be removed from the queue in the process.
@ -189,13 +203,19 @@ class Radio(FastAPI):
if not isinstance(self.radio_util.active_playlist, list) or not self.radio_util.active_playlist: if not isinstance(self.radio_util.active_playlist, list) or not self.radio_util.active_playlist:
await self.radio_util.load_playlist() await self.radio_util.load_playlist()
await self.radio_util._ls_skip() await self.radio_util._ls_skip()
return None return JSONResponse(status_code=500, content={
'err': True,
'errorText': 'General failure occurred, prompting playlist reload.',
})
next = self.radio_util.active_playlist.pop(0) next = self.radio_util.active_playlist.pop(0)
if not isinstance(next, dict): if not isinstance(next, dict):
logging.critical("next is of type: %s, reloading playlist...", type(next)) logging.critical("next is of type: %s, reloading playlist...", type(next))
await self.radio_util.load_playlist() await self.radio_util.load_playlist()
await self.radio_util._ls_skip() await self.radio_util._ls_skip()
return None return JSONResponse(status_code=500, content={
'err': True,
'errorText': 'General failure occurred, prompting playlist reload.',
})
duration: int = next['duration'] duration: int = next['duration']
time_started: int = int(time.time()) time_started: int = int(time.time())
@ -216,37 +236,41 @@ class Radio(FastAPI):
try: try:
if not await self.radio_util.get_album_art(file_path=next['file_path']): if not await self.radio_util.get_album_art(file_path=next['file_path']):
album_art = await self.radio_util.get_album_art(file_path=next['file_path']) album_art = await self.radio_util.get_album_art(file_path=next['file_path'])
if not album_art: if album_art:
return None await self.radio_util.cache_album_art(next['id'], album_art)
await self.radio_util.cache_album_art(next['id'], album_art) else:
logging.debug("Could not read album art for %s",
next['file_path'])
except: except:
traceback.print_exc() traceback.print_exc()
return next return JSONResponse(content=next)
async def radio_request(self, data: ValidRadioSongRequest, request: Request) -> dict: async def radio_request(self, data: ValidRadioSongRequest, request: Request) -> JSONResponse:
"""Song request handler""" """
Song request handler
"""
if not self.util.check_key(path=request.url.path, req_type=4, key=data.key): if not self.util.check_key(path=request.url.path, req_type=4, key=data.key):
raise HTTPException(status_code=403, detail="Unauthorized") raise HTTPException(status_code=403, detail="Unauthorized")
artistsong: Optional[str] = data.artistsong artistsong: Optional[str] = data.artistsong
artist: Optional[str] = data.artist artist: Optional[str] = data.artist
song: Optional[str] = data.song song: Optional[str] = data.song
if artistsong and (artist or song): if artistsong and (artist or song):
return { return JSONResponse(status_code=500, content={
'err': True, 'err': True,
'errorText': 'Invalid request', 'errorText': 'Invalid request',
} })
if not artistsong and (not artist or not song): if not artistsong and (not artist or not song):
return { return JSONResponse(status_code=500, content={
'err': True, 'err': True,
'errorText': 'Invalid request', 'errorText': 'Invalid request',
} })
search: bool = await self.radio_util.search_playlist(artistsong=artistsong, search: bool = await self.radio_util.search_playlist(artistsong=artistsong,
artist=artist, artist=artist,
song=song) song=song)
if data.alsoSkip: if data.alsoSkip:
await self.radio_util._ls_skip() await self.radio_util._ls_skip()
return { return JSONResponse(content={
'result': search 'result': search
} })

View File

@ -20,13 +20,17 @@ from .constructors import RadioException
double_space = regex.compile(r'\s{2,}') double_space = regex.compile(r'\s{2,}')
class RadioUtil: class RadioUtil:
"""
Radio Utils
"""
def __init__(self, constants) -> None: def __init__(self, constants) -> None:
self.constants = constants self.constants = constants
self.gpt = gpt.GPT(self.constants) self.gpt = gpt.GPT(self.constants)
self.ls_uri: str = "http://10.10.10.101:29000" self.ls_uri: str = "http://10.10.10.101:29000"
self.sqlite_exts: list[str] = ['/home/singer/api/solibs/spellfix1.cpython-311-x86_64-linux-gnu.so'] self.sqlite_exts: list[str] = ['/home/singer/api/solibs/spellfix1.cpython-311-x86_64-linux-gnu.so']
self.active_playlist_path: str|LiteralString = os.path.join("/usr/local/share", self.active_playlist_path: Union[str, LiteralString] = os.path\
"sqlite_dbs", "track_file_map.db") .join("/usr/local/share",
"sqlite_dbs", "track_file_map.db")
self.active_playlist_name = "default" # not used self.active_playlist_name = "default" # not used
self.active_playlist: list[dict] = [] self.active_playlist: list[dict] = []
self.now_playing: dict = { self.now_playing: dict = {
@ -49,17 +53,19 @@ class RadioUtil:
} }
} }
def duration_conv(self, s: int|float) -> str: def duration_conv(self,
s: Union[int, float]) -> str:
""" """
Convert duration given in seconds to hours, minutes, and seconds (h:m:s) Convert duration given in seconds to hours, minutes, and seconds (h:m:s)
Args: Args:
s (int|float): seconds to convert s (Union[int, float]): seconds to convert
Returns: Returns:
str str
""" """
return str(datetime.timedelta(seconds=s)).split(".", maxsplit=1)[0] return str(datetime.timedelta(seconds=s)).split(".", maxsplit=1)[0]
async def search_playlist(self, artistsong: Optional[str] = None, artist: Optional[str] = None, async def search_playlist(self, artistsong: Optional[str] = None,
artist: Optional[str] = None,
song: Optional[str] = None) -> bool: song: Optional[str] = None) -> bool:
""" """
Search for track, add it up next in play queue if found Search for track, add it up next in play queue if found
@ -137,7 +143,7 @@ class RadioUtil:
LIMITED TO ONE ARTIST... LIMITED TO ONE ARTIST...
""" """
# db_query: str = 'SELECT distinct(artist || " - " || song) AS artistdashsong, id, artist, song, genre, file_path, duration FROM tracks\ # db_query = 'SELECT distinct(artist || " - " || song) AS artistdashsong, id, artist, song, genre, file_path, duration FROM tracks\
# WHERE artist LIKE "%bad omens%" GROUP BY artistdashsong ORDER BY RANDOM()' # WHERE artist LIKE "%bad omens%" GROUP BY artistdashsong ORDER BY RANDOM()'
async with sqlite3.connect(self.active_playlist_path, async with sqlite3.connect(self.active_playlist_path,
@ -160,7 +166,8 @@ class RadioUtil:
except: except:
traceback.print_exc() traceback.print_exc()
async def cache_album_art(self, track_id: int, album_art: bytes) -> None: async def cache_album_art(self, track_id: int,
album_art: bytes) -> None:
""" """
Cache Album Art to SQLite DB Cache Album Art to SQLite DB
Args: Args:
@ -201,7 +208,7 @@ class RadioUtil:
async with await db_conn.execute(query, async with await db_conn.execute(query,
query_params) as db_cursor: query_params) as db_cursor:
result: Optional[sqlite3.Row|bool] = await db_cursor.fetchone() result: Optional[Union[sqlite3.Row, bool]] = await db_cursor.fetchone()
if not result or not isinstance(result, sqlite3.Row): if not result or not isinstance(result, sqlite3.Row):
return None return None
return result['album_art'] return result['album_art']
@ -209,13 +216,14 @@ class RadioUtil:
traceback.print_exc() traceback.print_exc()
return None return None
def get_queue_item_by_uuid(self, uuid: str) -> Optional[tuple[int, dict]]: def get_queue_item_by_uuid(self,
uuid: str) -> Optional[tuple[int, dict]]:
""" """
Get queue item by UUID Get queue item by UUID
Args: Args:
uuid: The UUID to search uuid: The UUID to search
Returns: Returns:
dict|None Optional[tuple[int, dict]]
""" """
for x, item in enumerate(self.active_playlist): for x, item in enumerate(self.active_playlist):
if item.get('uuid') == uuid: if item.get('uuid') == uuid:
@ -242,16 +250,18 @@ class RadioUtil:
return False # failsafe return False # failsafe
async def get_ai_song_info(self, artist: str, song: str) -> Optional[str]: async def get_ai_song_info(self, artist: str,
song: str) -> Optional[str]:
""" """
Get AI Song Info Get AI Song Info
Args: Args:
artist (str) artist (str)
song (str) song (str)
Returns: Returns:
str|None Optional[str]
""" """
response: Optional[str] = await self.gpt.get_completion(prompt=f"I am going to listen to {song} by {artist}.") prompt: str = f" am going to listen to {song} by {artist}."
response: Optional[str] = await self.gpt.get_completion(prompt)
if not response: if not response:
logging.critical("No response received from GPT?") logging.critical("No response received from GPT?")
return None return None
@ -298,7 +308,7 @@ class RadioUtil:
}, },
{ {
"name": "Higher Res", "name": "Higher Res",
"value": "[stream/icecast](https://relay.sfm.codey.lol/aces.ogg) || [web player](https://codey.lol/radio)", "value": "[stream/icecast](https://relay.sfm.codey.lol/aces.ogg) | [web player](https://codey.lol/radio)",
"inline": True, "inline": True,
} }
] ]

View File

@ -2,22 +2,26 @@
import os import os
import random import random
from typing import LiteralString, Optional from typing import Union, LiteralString
import aiosqlite as sqlite3 import aiosqlite as sqlite3
from fastapi import FastAPI from fastapi import FastAPI
from fastapi.responses import JSONResponse
from .constructors import RandMsgRequest from .constructors import RandMsgRequest
class RandMsg(FastAPI): class RandMsg(FastAPI):
"""Random Message Endpoint""" """
def __init__(self, app: FastAPI, util, constants): # pylint: disable=super-init-not-called Random Message Endpoint
self.app = app """
def __init__(self, app: FastAPI,
util, constants) -> None: # pylint: disable=super-init-not-called
self.app: FastAPI = app
self.util = util self.util = util
self.constants = constants self.constants = constants
self.endpoint_name = "randmsg" self.endpoint_name = "randmsg"
app.add_api_route(f"/{self.endpoint_name}", self.randmsg_handler, methods=["POST"]) app.add_api_route(f"/{self.endpoint_name}", self.randmsg_handler, methods=["POST"])
async def randmsg_handler(self, data: RandMsgRequest): async def randmsg_handler(self, data: RandMsgRequest) -> JSONResponse:
""" """
Get a randomly generated message Get a randomly generated message
""" """
@ -25,16 +29,16 @@ class RandMsg(FastAPI):
short: bool = data.short if data.short else False short: bool = data.short if data.short else False
if not short: if not short:
db_rand_selected = random.choice([0, 1, 3]) db_rand_selected: int = random.choice([0, 1, 3])
else: else:
db_rand_selected = 9 db_rand_selected = 9
title_attr = "Unknown" title_attr: str = "Unknown"
match db_rand_selected: match db_rand_selected:
case 0: case 0:
randmsg_db_path = os.path.join("/usr/local/share", randmsg_db_path: Union[str, LiteralString] = os.path.join("/usr/local/share",
"sqlite_dbs", "qajoke.db") # For qajoke db "sqlite_dbs", "qajoke.db") # For qajoke db
db_query = "SELECT id, ('<b>Q:</b> ' || question || '<br/><b>A:</b> ' \ db_query: str = "SELECT id, ('<b>Q:</b> ' || question || '<br/><b>A:</b> ' \
|| answer) FROM jokes ORDER BY RANDOM() LIMIT 1" # For qajoke db || answer) FROM jokes ORDER BY RANDOM() LIMIT 1" # For qajoke db
title_attr = "QA Joke DB" title_attr = "QA Joke DB"
case 1 | 9: case 1 | 9:
@ -74,23 +78,16 @@ class RandMsg(FastAPI):
db_query = """SELECT id, (title || "<br>" || body) FROM jokes \ db_query = """SELECT id, (title || "<br>" || body) FROM jokes \
WHERE score >= 10000 ORDER BY RANDOM() LIMIT 1""" WHERE score >= 10000 ORDER BY RANDOM() LIMIT 1"""
title_attr = "r/jokes DB" title_attr = "r/jokes DB"
case 6:
randmsg_db_path = os.path.join("/usr/local/share",
"sqlite_dbs",
"donnies.db") # Donnies DB
random.seed()
twilight_or_mice: str = random.choice(["twilight", "mice"])
db_query = f"SELECT id, text FROM {twilight_or_mice} ORDER BY RANDOM() LIMIT 1"
title_attr = "Donnies DB"
async with sqlite3.connect(database=randmsg_db_path, timeout=1) as _db: async with sqlite3.connect(database=randmsg_db_path, timeout=1) as _db:
async with await _db.execute(db_query) as _cursor: async with await _db.execute(db_query) as _cursor:
result = await _cursor.fetchone() result: sqlite3.Row = await _cursor.fetchone()
(result_id, result_msg) = result (result_id, result_msg) = result
result_msg = result_msg.strip() result_msg = result_msg.strip()
return { return JSONResponse(content=
"id": result_id, {
"msg": result_msg, "id": result_id,
'title': title_attr "msg": result_msg,
} "title": title_attr,
})

View File

@ -3,13 +3,16 @@
import os import os
import aiosqlite as sqlite3 import aiosqlite as sqlite3
from fastapi import FastAPI from fastapi import FastAPI
from typing import Optional, LiteralString from fastapi.responses import JSONResponse
from typing import Optional, LiteralString, Union
from .constructors import ValidShowEpisodeLineRequest, ValidShowEpisodeListRequest from .constructors import ValidShowEpisodeLineRequest, ValidShowEpisodeListRequest
class Transcriptions(FastAPI): class Transcriptions(FastAPI):
"""Transcription Endpoints""" """
Transcription Endpoints
"""
def __init__(self, app: FastAPI, util, constants) -> None: # pylint: disable=super-init-not-called def __init__(self, app: FastAPI, util, constants) -> None: # pylint: disable=super-init-not-called
self.app = app self.app: FastAPI = app
self.util = util self.util = util
self.constants = constants self.constants = constants
@ -23,26 +26,28 @@ class Transcriptions(FastAPI):
app.add_api_route(f"/{endpoint}", handler, methods=["POST"], app.add_api_route(f"/{endpoint}", handler, methods=["POST"],
include_in_schema=False) include_in_schema=False)
async def get_episodes_handler(self, data: ValidShowEpisodeListRequest) -> dict: async def get_episodes_handler(self, data: ValidShowEpisodeListRequest) -> JSONResponse:
"""Get list of episodes by show id""" """
Get list of episodes by show id
"""
show_id: int = data.s show_id: int = data.s
db_path: Optional[str|LiteralString] = None db_path: Optional[Union[str, LiteralString]] = None
db_query: Optional[str] = None db_query: Optional[str] = None
show_title: Optional[str] = None show_title: Optional[str] = None
if show_id is None: if not show_id:
return { return JSONResponse(status_code=500, content={
'err': True, 'err': True,
'errorText': 'Invalid request', 'errorText': 'Invalid request',
} })
show_id = int(show_id) show_id = int(show_id)
if not(str(show_id).isnumeric()) or show_id not in [0, 1, 2]: if not(str(show_id).isnumeric()) or show_id not in [0, 1, 2]:
return { return JSONResponse(status_code=500, content={
'err': True, 'err': True,
'errorText': 'Show not found.', 'errorText': 'Show not found.',
} })
match show_id: match show_id:
case 0: case 0:
@ -61,33 +66,35 @@ class Transcriptions(FastAPI):
db_query = """SELECT DISTINCT(("S" || EP_S || "E" || EP_EP || " " || EP_TITLE)), EP_ID FROM clean_dialog ORDER BY EP_S, EP_EP""" db_query = """SELECT DISTINCT(("S" || EP_S || "E" || EP_EP || " " || EP_TITLE)), EP_ID FROM clean_dialog ORDER BY EP_S, EP_EP"""
show_title = "Parks And Rec" show_title = "Parks And Rec"
case _: case _:
return { return JSONResponse(status_code=500, content={
'err': True, 'err': True,
'errorText': 'Unknown error.' 'errorText': 'Unknown error.',
} })
async with sqlite3.connect(database=db_path, timeout=1) as _db: async with sqlite3.connect(database=db_path, timeout=1) as _db:
async with await _db.execute(db_query) as _cursor: async with await _db.execute(db_query) as _cursor:
result: list[tuple] = await _cursor.fetchall() result: list[tuple] = await _cursor.fetchall()
return { return JSONResponse(content={
"show_title": show_title, "show_title": show_title,
"episodes": [ "episodes": [
{ {
'id': item[1], 'id': item[1],
'ep_friendly': item[0] 'ep_friendly': item[0],
} for item in result] } for item in result],
} })
async def get_episode_lines_handler(self, data: ValidShowEpisodeLineRequest) -> dict: async def get_episode_lines_handler(self, data: ValidShowEpisodeLineRequest) -> JSONResponse:
"""Get lines for a particular episode""" """
show_id: int = data.s Get lines for a particular episode
episode_id: int = data.e """
show_id: int = int(data.s)
episode_id: int = int(data.e)
# pylint: disable=line-too-long # pylint: disable=line-too-long
match show_id: match show_id:
case 0: case 0:
db_path = os.path.join("/usr/local/share", db_path: Union[str, LiteralString] = os.path.join("/usr/local/share",
"sqlite_dbs", "sp.db") "sqlite_dbs", "sp.db")
db_query = """SELECT ("S" || Season || "E" || Episode || " " || Title), Character, Line FROM SP_DAT WHERE ID = ?""" db_query: str = """SELECT ("S" || Season || "E" || Episode || " " || Title), Character, Line FROM SP_DAT WHERE ID = ?"""
case 1: case 1:
db_path = os.path.join("/usr/local/share", db_path = os.path.join("/usr/local/share",
"sqlite_dbs", "futur.db") "sqlite_dbs", "futur.db")
@ -98,23 +105,24 @@ class Transcriptions(FastAPI):
db_query = """SELECT ("S" || EP_S || "E" || EP_EP || " " || EP_TITLE), EP_LINE_SPEAKER, EP_LINE FROM clean_dialog WHERE EP_ID = ? ORDER BY id ASC""" db_query = """SELECT ("S" || EP_S || "E" || EP_EP || " " || EP_TITLE), EP_LINE_SPEAKER, EP_LINE FROM clean_dialog WHERE EP_ID = ? ORDER BY id ASC"""
case _: case _:
return { return JSONResponse(status_code=500, content={
'err': True, 'err': True,
'errorText': 'Unknown error' 'errorText': 'Unknown error',
} })
async with sqlite3.connect(database=db_path, timeout=1) as _db: async with sqlite3.connect(database=db_path,
timeout=1) as _db:
params: tuple = (episode_id,) params: tuple = (episode_id,)
async with await _db.execute(db_query, params) as _cursor: async with await _db.execute(db_query, params) as _cursor:
result: list[tuple] = await _cursor.fetchall() result: list[tuple] = await _cursor.fetchall()
first_result: tuple = result[0] first_result: tuple = result[0]
return { return JSONResponse(content={
'episode_id': episode_id, 'episode_id': episode_id,
'ep_friendly': first_result[0].strip(), 'ep_friendly': first_result[0].strip(),
'lines': [ 'lines': [
{ {
'speaker': item[1].strip(), 'speaker': item[1].strip(),
'line': item[2].strip() 'line': item[2].strip(),
} for item in result] } for item in result],
} })

View File

@ -3,7 +3,7 @@
import logging import logging
from typing import Optional from typing import Optional
from fastapi import FastAPI, Request, HTTPException from fastapi import FastAPI, Request, HTTPException
from pydantic import BaseModel from fastapi.responses import JSONResponse
from aiohttp import ClientSession, ClientTimeout from aiohttp import ClientSession, ClientTimeout
from .constructors import ValidXCRequest from .constructors import ValidXCRequest
# pylint: disable=invalid-name # pylint: disable=invalid-name
@ -11,7 +11,7 @@ from .constructors import ValidXCRequest
class XC(FastAPI): class XC(FastAPI):
"""XC (CrossComm) Endpoints""" """XC (CrossComm) Endpoints"""
def __init__(self, app: FastAPI, util, constants) -> None: # pylint: disable=super-init-not-called def __init__(self, app: FastAPI, util, constants) -> None: # pylint: disable=super-init-not-called
self.app = app self.app: FastAPI = app
self.util = util self.util = util
self.constants = constants self.constants = constants
@ -23,12 +23,13 @@ class XC(FastAPI):
app.add_api_route(f"/{endpoint}", handler, methods=["POST"], app.add_api_route(f"/{endpoint}", handler, methods=["POST"],
include_in_schema=False) include_in_schema=False)
async def xc_handler(self, data: ValidXCRequest, request: Request) -> dict: async def xc_handler(self, data: ValidXCRequest,
request: Request) -> JSONResponse:
"""Handle XC Commands""" """Handle XC Commands"""
try: try:
key: str = data.key key: str = data.key
bid: int = data.bid bid: int = int(data.bid)
cmd: str = data.cmd cmd: str = data.cmd
cmd_data: Optional[dict] = data.data cmd_data: Optional[dict] = data.data
if not self.util.check_key(path=request.url.path, req_type=0, key=key): if not self.util.check_key(path=request.url.path, req_type=0, key=key):
@ -40,10 +41,10 @@ class XC(FastAPI):
} }
if not bid in BID_ADDR_MAP: if not bid in BID_ADDR_MAP:
return { return JSONResponse(status_code=500, content={
'err': True, 'err': True,
'errorText': 'Invalid bot id' 'errorText': 'Invalid bot id'
} })
bot_api_url: str = f'http://{BID_ADDR_MAP[bid]}/' bot_api_url: str = f'http://{BID_ADDR_MAP[bid]}/'
async with ClientSession() as session: async with ClientSession() as session:
@ -51,13 +52,13 @@ class XC(FastAPI):
'Content-Type': 'application/json; charset=utf-8' 'Content-Type': 'application/json; charset=utf-8'
}, timeout=ClientTimeout(connect=5, sock_read=5)) as aiohttp_request: }, timeout=ClientTimeout(connect=5, sock_read=5)) as aiohttp_request:
response: dict = await aiohttp_request.json() response: dict = await aiohttp_request.json()
return { return JSONResponse(content={
'success': True, 'success': True,
'response': response 'response': response
} })
except Exception as e: except Exception as e:
logging.debug("Error: %s", str(e)) logging.debug("Error: %s", str(e))
return { return JSONResponse(status_code=500, content={
'err': True, 'err': True,
'errorText': 'General error.', 'errorText': 'General error.',
} })

View File

@ -2,28 +2,30 @@
import importlib import importlib
from fastapi import FastAPI from fastapi import FastAPI
from pydantic import BaseModel from fastapi.responses import JSONResponse
from typing import Optional from typing import Optional, Union
from .constructors import ValidYTSearchRequest from .constructors import ValidYTSearchRequest
class YT(FastAPI): class YT(FastAPI):
"""YT Endpoints""" """
def __init__(self, app: FastAPI, util, constants) -> None: # pylint: disable=super-init-not-called YT Endpoints
self.app = app """
def __init__(self, app: FastAPI, util,
constants) -> None: # pylint: disable=super-init-not-called
self.app: FastAPI = app
self.util = util self.util = util
self.constants = constants self.constants = constants
self.ytsearch = importlib.import_module("youtube_search_async").YoutubeSearch() self.ytsearch = importlib.import_module("youtube_search_async").YoutubeSearch()
self.endpoints: dict = { self.endpoints: dict = {
"yt/search": self.yt_video_search_handler, "yt/search": self.yt_video_search_handler,
#tbd
} }
for endpoint, handler in self.endpoints.items(): for endpoint, handler in self.endpoints.items():
app.add_api_route(f"/{endpoint}", handler, methods=["POST"], app.add_api_route(f"/{endpoint}", handler, methods=["POST"],
include_in_schema=True) include_in_schema=True)
async def yt_video_search_handler(self, data: ValidYTSearchRequest) -> dict: async def yt_video_search_handler(self, data: ValidYTSearchRequest) -> JSONResponse:
""" """
Search for YT Video by Title (closest match returned) Search for YT Video by Title (closest match returned)
- **t**: Title to search - **t**: Title to search
@ -32,13 +34,13 @@ class YT(FastAPI):
title: str = data.t title: str = data.t
yts_res: Optional[list[dict]] = await self.ytsearch.search(title) yts_res: Optional[list[dict]] = await self.ytsearch.search(title)
if not yts_res: if not yts_res:
return { return JSONResponse(status_code=404, content={
'err': True, 'err': True,
'errorText': 'No result.', 'errorText': 'No result.',
} })
yt_video_id: str|bool = yts_res[0].get('id', False) yt_video_id: Union[str, bool] = yts_res[0].get('id', False)
return { return JSONResponse(content={
'video_id': yt_video_id, 'video_id': yt_video_id,
'extras': yts_res[0] 'extras': yts_res[0],
} })

View File

@ -3,16 +3,18 @@ from typing import Optional
from openai import AsyncOpenAI from openai import AsyncOpenAI
class GPT: class GPT:
def __init__(self, constants): def __init__(self, constants) -> None:
self.constants = constants self.constants = constants
self.api_key = self.constants.OPENAI_API_KEY self.api_key: str = self.constants.OPENAI_API_KEY
self.client = AsyncOpenAI( self.client: AsyncOpenAI = AsyncOpenAI(
api_key=self.api_key, api_key=self.api_key,
timeout=10.0, timeout=10.0,
) )
self.default_system_prompt = "You are a helpful assistant who will provide only totally accurate tidbits of info on the specific songs the user may listen to." self.default_system_prompt: str = """You are a helpful assistant who will provide only totally accurate tidbits of \
info on the specific songs the user may listen to."""
async def get_completion(self, prompt: str, system_prompt: Optional[str] = None) -> str: async def get_completion(self, prompt: str,
system_prompt: Optional[str] = None) -> Optional[str]:
if not system_prompt: if not system_prompt:
system_prompt = self.default_system_prompt system_prompt = self.default_system_prompt
chat_completion = await self.client.chat.completions.create( chat_completion = await self.client.chat.completions.create(
@ -29,4 +31,5 @@ class GPT:
model="gpt-4o-mini", model="gpt-4o-mini",
temperature=0.35, temperature=0.35,
) )
return chat_completion.choices[0].message.content response: Optional[str] = chat_completion.choices[0].message.content
return response

View File

@ -20,7 +20,7 @@ class LastFM:
async def search_artist(self, artist: Optional[str] = None) -> dict: async def search_artist(self, artist: Optional[str] = None) -> dict:
"""Search LastFM for an artist""" """Search LastFM for an artist"""
try: try:
if artist is None: if not artist:
return { return {
'err': 'No artist specified.', 'err': 'No artist specified.',
} }
@ -91,8 +91,7 @@ class LastFM:
dict dict
""" """
try: try:
if artist is None or album is None: if not artist or not album:
logging.info("inv request")
return { return {
'err': 'No artist or album specified', 'err': 'No artist or album specified',
} }
@ -111,16 +110,16 @@ class LastFM:
'err': 'General Failure', 'err': 'General Failure',
} }
async def get_artist_albums(self, artist: Optional[str] = None) -> dict|list[dict]: async def get_artist_albums(self, artist: Optional[str] = None) -> Union[dict, list[dict]]:
""" """
Get Artists Albums from LastFM Get Artists Albums from LastFM
Args: Args:
artist (Optional[str]) artist (Optional[str])
Returns: Returns:
dict|list[dict] Union[dict, list[dict]]
""" """
try: try:
if artist is None: if not artist:
return { return {
'err': 'No artist specified.', 'err': 'No artist specified.',
} }
@ -149,10 +148,10 @@ class LastFM:
Args: Args:
artist (Optional[str]) artist (Optional[str])
Returns: Returns:
int|dict int
""" """
try: try:
if artist is None: if not artist:
return -1 return -1
artist_search: dict = await self.search_artist(artist=artist) artist_search: dict = await self.search_artist(artist=artist)
if not artist_search: if not artist_search:

View File

@ -1,6 +1,7 @@
#!/usr/bin/env python3.12 #!/usr/bin/env python3.12
from dataclasses import dataclass, asdict from dataclasses import dataclass
from typing import Union
@dataclass @dataclass
class LyricsResult: class LyricsResult:
@ -10,12 +11,12 @@ class LyricsResult:
artist (str): returned artist artist (str): returned artist
song (str): returned song song (str): returned song
src (str): source result was fetched from src (str): source result was fetched from
lyrics (str|list): str if plain lyrics, list for lrc lyrics (Union[str, list]): str if plain lyrics, list for lrc
time (float): time taken to retrieve lyrics from source time (float): time taken to retrieve lyrics from source
""" """
artist: str artist: str
song: str song: str
src: str src: str
lyrics: str|list lyrics: Union[str, list]
confidence: int confidence: int
time: float = 0.00 time: float = 0.00

View File

@ -1,5 +1,4 @@
#!/usr/bin/env python3.12 #!/usr/bin/env python3.12
# pylint: disable=wrong-import-order, wrong-import-position
from typing import Optional from typing import Optional
from lyric_search.constructors import LyricsResult from lyric_search.constructors import LyricsResult
@ -22,7 +21,8 @@ class Aggregate:
self.redis_cache = redis_cache.RedisCache() self.redis_cache = redis_cache.RedisCache()
self.notifier = notifier.DiscordNotifier() self.notifier = notifier.DiscordNotifier()
async def search(self, artist: str, song: str, plain: bool = True) -> Optional[LyricsResult]: async def search(self, artist: str, song: str,
plain: Optional[bool] = True) -> Optional[LyricsResult]:
""" """
Aggregate Search Aggregate Search
Args: Args:
@ -30,7 +30,7 @@ class Aggregate:
song (str): Song to search song (str): Song to search
plain (bool): Search for plain lyrics (lrc otherwise) plain (bool): Search for plain lyrics (lrc otherwise)
Returns: Returns:
LyricsResult|None: The result, if found - None otherwise. Optional[LyricsResult]: The result, if found - None otherwise.
""" """
if not plain: if not plain:
logging.info("LRCs requested, limiting search to LRCLib") logging.info("LRCs requested, limiting search to LRCLib")

View File

@ -9,7 +9,7 @@ import sys
import traceback import traceback
sys.path.insert(1,'..') sys.path.insert(1,'..')
sys.path.insert(1,'.') sys.path.insert(1,'.')
from typing import Optional, Any from typing import Optional, Union, LiteralString
import aiosqlite as sqlite3 import aiosqlite as sqlite3
from . import redis_cache from . import redis_cache
from lyric_search import utils, notifier from lyric_search import utils, notifier
@ -23,7 +23,7 @@ log_level = logging.getLevelName(logger.level)
class Cache: class Cache:
"""Cache Search Module""" """Cache Search Module"""
def __init__(self) -> None: def __init__(self) -> None:
self.cache_db: str = os.path.join("/", "usr", "local", "share", self.cache_db: Union[str, LiteralString] = os.path.join("/", "usr", "local", "share",
"sqlite_dbs", "cached_lyrics.db") "sqlite_dbs", "cached_lyrics.db")
self.redis_cache = redis_cache.RedisCache() self.redis_cache = redis_cache.RedisCache()
self.notifier = notifier.DiscordNotifier() self.notifier = notifier.DiscordNotifier()
@ -34,16 +34,17 @@ class Cache:
self.label: str = "Cache" self.label: str = "Cache"
def get_matched(self, matched_candidate: tuple, confidence: int, def get_matched(self, matched_candidate: tuple, confidence: int,
sqlite_rows: list[sqlite3.Row] = None, redis_results: Any = None) -> Optional[LyricsResult]: sqlite_rows: Optional[list[sqlite3.Row]] = None,
redis_results: Optional[list] = None) -> Optional[LyricsResult]:
""" """
Get Matched Result Get Matched Result
Args: Args:
matched_candidate (tuple): the correctly matched candidate returned by matcher.best_match matched_candidate (tuple): the correctly matched candidate returned by matcher.best_match
confidence (int): % confidence confidence (int): % confidence
sqlite_rows (list[sqlite3.Row]|None): List of returned rows from SQLite DB, or None if Redis sqlite_rows (Optional[list[sqlite3.Row]]): List of returned rows from SQLite DB, or None if Redis
redis_results (Any): List of Redis returned data, or None if SQLite redis_results (Any): List of Redis returned data, or None if SQLite
Returns: Returns:
LyricsResult|None: The result, if found - None otherwise. Optional[LyricsResult]: The result, if found - None otherwise.
""" """
matched_id: int = matched_candidate[0] matched_id: int = matched_candidate[0]
if redis_results: if redis_results:
@ -60,7 +61,7 @@ class Cache:
else: else:
for row in sqlite_rows: for row in sqlite_rows:
if row[0] == matched_id: if row[0] == matched_id:
(_id, artist, song, lyrics, original_src, _confidence) = row (_id, artist, song, lyrics, original_src) = row[:-1]
return LyricsResult( return LyricsResult(
artist=artist, artist=artist,
song=song, song=song,
@ -119,7 +120,8 @@ class Cache:
await self.notifier.send(f"ERROR @ {__file__.rsplit("/", maxsplit=1)[-1]}", await self.notifier.send(f"ERROR @ {__file__.rsplit("/", maxsplit=1)[-1]}",
f"cache::store >> {str(e)}") f"cache::store >> {str(e)}")
async def sqlite_rowcount(self, where: Optional[str] = None, params: Optional[tuple] = None) -> int: async def sqlite_rowcount(self, where: Optional[str] = None,
params: Optional[tuple] = None) -> int:
""" """
Get rowcount for cached_lyrics DB Get rowcount for cached_lyrics DB
Args: Args:
@ -217,7 +219,7 @@ class Cache:
artist: the artist to search artist: the artist to search
song: the song to search song: the song to search
Returns: Returns:
LyricsResult|None: The result, if found - None otherwise. Optional[LyricsResult]: The result, if found - None otherwise.
""" """
try: try:
# pylint: enable=unused-argument # pylint: enable=unused-argument
@ -253,7 +255,7 @@ class Cache:
result_tracks.append((key, f"{track['artist']} - {track['song']}")) result_tracks.append((key, f"{track['artist']} - {track['song']}"))
if not random_search: if not random_search:
best_match: tuple|None = matcher.find_best_match(input_track=input_track, best_match: Optional[tuple] = matcher.find_best_match(input_track=input_track,
candidate_tracks=result_tracks) candidate_tracks=result_tracks)
else: else:
best_match = (result_tracks[0], 100) best_match = (result_tracks[0], 100)
@ -298,7 +300,7 @@ class Cache:
(_id, _artist, _song, _lyrics, _src, _confidence) = track (_id, _artist, _song, _lyrics, _src, _confidence) = track
result_tracks.append((_id, f"{_artist} - {_song}")) result_tracks.append((_id, f"{_artist} - {_song}"))
if not random_search: if not random_search:
best_match: tuple|None = matcher.find_best_match(input_track=input_track, best_match: Optional[tuple] = matcher.find_best_match(input_track=input_track,
candidate_tracks=result_tracks) candidate_tracks=result_tracks)
else: else:
best_match = (result_tracks[0], 100) best_match = (result_tracks[0], 100)
@ -315,5 +317,4 @@ class Cache:
await self.redis_cache.increment_found_count(self.label) await self.redis_cache.increment_found_count(self.label)
return matched return matched
except: except:
traceback.print_exc() traceback.print_exc()
return

View File

@ -23,7 +23,9 @@ class InvalidResponseException(Exception):
""" """
class Genius: class Genius:
"""Genius Search Module""" """
Genius Search Module
"""
def __init__(self) -> None: def __init__(self) -> None:
self.label: str = "Genius" self.label: str = "Genius"
self.genius_url: str = private.GENIUS_URL self.genius_url: str = private.GENIUS_URL
@ -36,14 +38,15 @@ class Genius:
self.redis_cache = redis_cache.RedisCache() self.redis_cache = redis_cache.RedisCache()
# pylint: disable=unused-argument # pylint: disable=unused-argument
async def search(self, artist: str, song: str, **kwargs) -> Optional[LyricsResult]: async def search(self, artist: str, song: str,
**kwargs) -> Optional[LyricsResult]:
""" """
Genius Search Genius Search
Args: Args:
artist (str): the artist to search artist (str): the artist to search
song (str): the song to search song (str): the song to search
Returns: Returns:
LyricsResult|None: The result, if found - None otherwise. Optional[LyricsResult]: The result, if found - None otherwise.
""" """
try: try:
# pylint: enable=unused-argument # pylint: enable=unused-argument
@ -59,7 +62,10 @@ class Genius:
timeout=self.timeout, timeout=self.timeout,
headers=self.headers) as request: headers=self.headers) as request:
request.raise_for_status() request.raise_for_status()
text: str|None = await request.text() text: Optional[str] = await request.text()
if not text:
raise InvalidResponseException("No search response.")
if len(text) < 100: if len(text) < 100:
raise InvalidResponseException("Search response text was invalid (len < 100 chars.)") raise InvalidResponseException("Search response text was invalid (len < 100 chars.)")
@ -94,14 +100,17 @@ class Genius:
timeout=self.timeout, timeout=self.timeout,
headers=self.headers) as scrape_request: headers=self.headers) as scrape_request:
scrape_request.raise_for_status() scrape_request.raise_for_status()
scrape_text: str|None = await scrape_request.text() scrape_text: Optional[str] = await scrape_request.text()
if not scrape_text:
raise InvalidResponseException("No scrape response.")
if len(scrape_text) < 100: if len(scrape_text) < 100:
raise InvalidResponseException("Scrape response was invalid (len < 100 chars.)") raise InvalidResponseException("Scrape response was invalid (len < 100 chars.)")
html = BeautifulSoup(htm.unescape(scrape_text).replace('<br/>', '\n'), "html.parser") html = BeautifulSoup(htm.unescape(scrape_text).replace('<br/>', '\n'), "html.parser")
divs: ResultSet|None = html.find_all("div", {"data-lyrics-container": "true"}) divs: Optional[ResultSet] = html.find_all("div", {"data-lyrics-container": "true"})
if not divs: if not divs:
return return
@ -124,8 +133,5 @@ class Genius:
await self.redis_cache.increment_found_count(self.label) await self.redis_cache.increment_found_count(self.label)
await self.cache.store(matched) await self.cache.store(matched)
return matched return matched
except: except:
# if log_level == "DEBUG": traceback.print_exc()
traceback.print_exc()
return

View File

@ -32,14 +32,15 @@ class LRCLib:
self.cache = cache.Cache() self.cache = cache.Cache()
self.redis_cache = redis_cache.RedisCache() self.redis_cache = redis_cache.RedisCache()
async def search(self, artist: str, song: str, plain: bool = True) -> Optional[LyricsResult]: async def search(self, artist: str, song: str,
plain: Optional[bool] = True) -> Optional[LyricsResult]:
""" """
LRCLib Search LRCLib Search
Args: Args:
artist (str): the artist to search artist (str): the artist to search
song (str): the song to search song (str): the song to search
Returns: Returns:
LyricsResult|None: The result, if found - None otherwise. Optional[LyricsResult]: The result, if found - None otherwise.
""" """
try: try:
artist: str = artist.strip().lower() artist: str = artist.strip().lower()
@ -61,12 +62,16 @@ class LRCLib:
timeout=self.timeout, timeout=self.timeout,
headers=self.headers) as request: headers=self.headers) as request:
request.raise_for_status() request.raise_for_status()
text: str|None = await request.text()
text: Optional[str] = await request.text()
if not text:
raise InvalidResponseException("No search response.")
if len(text) < 100: if len(text) < 100:
raise InvalidResponseException("Search response text was invalid (len < 100 chars.)") raise InvalidResponseException("Search response text was invalid (len < 100 chars.)")
search_data: dict|None = await request.json() search_data: Optional[dict] = await request.json()
if not isinstance(search_data, dict):
raise InvalidResponseException("No JSON search data.")
# logging.info("Search Data:\n%s", search_data) # logging.info("Search Data:\n%s", search_data)
@ -125,5 +130,4 @@ class LRCLib:
await self.cache.store(matched) await self.cache.store(matched)
return matched return matched
except: except:
traceback.print_exc() traceback.print_exc()
return

View File

@ -1,7 +1,4 @@
#!/usr/bin/env python3.12 #!/usr/bin/env python3.12
# pylint: disable=bare-except, broad-exception-caught, wrong-import-order
# pylint: disable=wrong-import-position
import logging import logging
import traceback import traceback
@ -9,7 +6,9 @@ import json
import time import time
import sys import sys
import regex import regex
from regex import Pattern
import asyncio import asyncio
from typing import Union, Optional
sys.path.insert(1,'..') sys.path.insert(1,'..')
from lyric_search import notifier from lyric_search import notifier
from lyric_search.constructors import LyricsResult from lyric_search.constructors import LyricsResult
@ -20,10 +19,6 @@ from redis.commands.search.field import TextField, TagField
from redis.commands.json.path import Path from redis.commands.json.path import Path
from . import private from . import private
logger = logging.getLogger() logger = logging.getLogger()
log_level = logging.getLevelName(logger.level) log_level = logging.getLevelName(logger.level)
@ -41,14 +36,15 @@ class RedisCache:
self.redis_client = redis.Redis(password=private.REDIS_PW) self.redis_client = redis.Redis(password=private.REDIS_PW)
self.notifier = notifier.DiscordNotifier() self.notifier = notifier.DiscordNotifier()
self.notify_warnings = True self.notify_warnings = True
self.regexes = [ self.regexes: list[Pattern] = [
regex.compile(r'\-'), regex.compile(r'\-'),
regex.compile(r'[^a-zA-Z0-9\s]'), regex.compile(r'[^a-zA-Z0-9\s]'),
] ]
try: try:
asyncio.get_event_loop().create_task(self.create_index()) asyncio.get_event_loop().create_task(self.create_index())
except: except Exception as e:
pass logging.debug("Failed to create redis create_index task: %s",
str(e))
async def create_index(self) -> None: async def create_index(self) -> None:
"""Create Index""" """Create Index"""
@ -64,10 +60,11 @@ class RedisCache:
if str(result) != "OK": if str(result) != "OK":
raise RedisException(f"Redis: Failed to create index: {result}") raise RedisException(f"Redis: Failed to create index: {result}")
except Exception as e: except Exception as e:
pass logging.debug("Failed to create redis index: %s",
# await self.notifier.send(f"ERROR @ {__file__.rsplit("/", maxsplit=1)[-1]}", f"Failed to create idx: {str(e)}") str(e))
def sanitize_input(self, artist: str, song: str, fuzzy: bool = False) -> tuple[str, str]: def sanitize_input(self, artist: str, song: str,
fuzzy: Optional[bool] = False) -> tuple[str, str]:
""" """
Sanitize artist/song input (convert to redis matchable fuzzy query) Sanitize artist/song input (convert to redis matchable fuzzy query)
Args: Args:
@ -121,7 +118,9 @@ class RedisCache:
traceback.print_exc() traceback.print_exc()
async def search(self, **kwargs) -> list[tuple]: async def search(self, artist: Optional[str] = None,
song: Optional[str] = None,
lyrics: Optional[str] = None) -> list[tuple]:
""" """
Search Redis Cache Search Redis Cache
Args: Args:
@ -133,9 +132,6 @@ class RedisCache:
""" """
try: try:
artist = kwargs.get('artist', '')
song = kwargs.get('song', '')
lyrics = kwargs.get('lyrics')
fuzzy_artist = None fuzzy_artist = None
fuzzy_song = None fuzzy_song = None
is_random_search = artist == "!" and song == "!" is_random_search = artist == "!" and song == "!"
@ -148,10 +144,10 @@ class RedisCache:
logging.debug("Redis: Searching normally first") logging.debug("Redis: Searching normally first")
(artist, song) = self.sanitize_input(artist, song) (artist, song) = self.sanitize_input(artist, song)
logging.debug("Seeking: %s - %s", artist, song) logging.debug("Seeking: %s - %s", artist, song)
search_res = await self.redis_client.ft().search(Query( search_res: Union[dict, list] = await self.redis_client.ft().search(Query(
f"@artist:{artist} @song:{song}" f"@artist:{artist} @song:{song}"
)) ))
search_res_out = [(result['id'].split(":", search_res_out: list[tuple] = [(result['id'].split(":",
maxsplit=1)[1], dict(json.loads(result['json']))) maxsplit=1)[1], dict(json.loads(result['json'])))
for result in search_res.docs] for result in search_res.docs]
if not search_res_out: if not search_res_out:
@ -167,8 +163,8 @@ class RedisCache:
for result in search_res.docs] for result in search_res.docs]
else: else:
random_redis_key = await self.redis_client.randomkey() random_redis_key: str = await self.redis_client.randomkey()
out_id = str(random_redis_key).split(":", out_id: str = str(random_redis_key).split(":",
maxsplit=1)[1][:-1] maxsplit=1)[1][:-1]
search_res = await self.redis_client.json().get(random_redis_key) search_res = await self.redis_client.json().get(random_redis_key)
search_res_out = [(out_id, search_res)] search_res_out = [(out_id, search_res)]
@ -179,7 +175,8 @@ class RedisCache:
except Exception as e: except Exception as e:
traceback.print_exc() traceback.print_exc()
# await self.notifier.send(f"ERROR @ {__file__.rsplit("/", maxsplit=1)[-1]}", f"{str(e)}\nSearch was: {artist} - {song}; fuzzy: {fuzzy_artist} - {fuzzy_song}") # await self.notifier.send(f"ERROR @ {__file__.rsplit("/", maxsplit=1)[-1]}", f"{str(e)}\nSearch was: {artist} - {song}; fuzzy: {fuzzy_artist} - {fuzzy_song}")
async def redis_store(self, sqlite_id: int, lyr_result: LyricsResult) -> None: async def redis_store(self, sqlite_id: int,
lyr_result: LyricsResult) -> None:
""" """
Store lyrics to redis cache Store lyrics to redis cache
Args: Args:
@ -191,7 +188,7 @@ class RedisCache:
try: try:
(search_artist, search_song) = self.sanitize_input(lyr_result.artist, (search_artist, search_song) = self.sanitize_input(lyr_result.artist,
lyr_result.song) lyr_result.song)
redis_mapping = { redis_mapping: dict = {
'id': sqlite_id, 'id': sqlite_id,
'src': lyr_result.src, 'src': lyr_result.src,
'date_retrieved': time.time(), 'date_retrieved': time.time(),
@ -206,8 +203,8 @@ class RedisCache:
'tags': '(none)', 'tags': '(none)',
'liked': 0, 'liked': 0,
} }
newkey = f"lyrics:000{sqlite_id}" newkey: str = f"lyrics:000{sqlite_id}"
jsonset = await self.redis_client.json().set(newkey, Path.root_path(), jsonset: bool = await self.redis_client.json().set(newkey, Path.root_path(),
redis_mapping) redis_mapping)
if not jsonset: if not jsonset:
raise RedisException(f"Failed to store {lyr_result.artist} - {lyr_result.song} (SQLite id: {sqlite_id}) to redis:\n{jsonset}") raise RedisException(f"Failed to store {lyr_result.artist} - {lyr_result.song} (SQLite id: {sqlite_id}) to redis:\n{jsonset}")

View File

@ -1,9 +1,10 @@
#!/usr/bin/env python3.12 #!/usr/bin/env python3.12
from difflib import SequenceMatcher from difflib import SequenceMatcher
from typing import List, Optional, Tuple from typing import List, Optional, Union, Any
import logging import logging
import regex import regex
from regex import Pattern
class TrackMatcher: class TrackMatcher:
"""Track Matcher""" """Track Matcher"""
@ -17,7 +18,7 @@ class TrackMatcher:
""" """
self.threshold = threshold self.threshold = threshold
def find_best_match(self, input_track: str, candidate_tracks: List[tuple[int|str, str]]) -> Optional[Tuple[str, float]]: def find_best_match(self, input_track: str, candidate_tracks: List[tuple[int|str, str]]) -> Optional[tuple]:
""" """
Find the best matching track from the candidate list. Find the best matching track from the candidate list.
@ -26,7 +27,7 @@ class TrackMatcher:
candidate_tracks (List[tuple[int|str, str]]): List of candidate tracks candidate_tracks (List[tuple[int|str, str]]): List of candidate tracks
Returns: Returns:
Optional[Tuple[int, str, float]]: Tuple of (best matching track, similarity score) Optional[tuple[int, str, float]]: Tuple of (best matching track, similarity score)
or None if no good match found or None if no good match found
""" """
@ -38,7 +39,7 @@ class TrackMatcher:
input_track = self._normalize_string(input_track) input_track = self._normalize_string(input_track)
best_match = None best_match = None
best_score = 0 best_score: float = 0.0
for candidate in candidate_tracks: for candidate in candidate_tracks:
normalized_candidate = self._normalize_string(candidate[1]) normalized_candidate = self._normalize_string(candidate[1])
@ -56,7 +57,10 @@ class TrackMatcher:
best_match = candidate best_match = candidate
# Return the match only if it meets the threshold # Return the match only if it meets the threshold
return (best_match, round(best_score * 100)) if best_score >= self.threshold else None if best_score >= self.threshold:
return None
match: tuple = (best_match, round(best_score * 100))
return match
def _normalize_string(self, text: str) -> str: def _normalize_string(self, text: str) -> str:
""" """
@ -98,10 +102,14 @@ class DataUtils:
Data Utils Data Utils
""" """
def __init__(self): def __init__(self) -> None:
self.lrc_regex = regex.compile(r'\[([0-9]{2}:[0-9]{2})\.[0-9]{1,3}\](\s(.*)){0,}') self.lrc_regex = regex.compile(r'\[([0-9]{2}:[0-9]{2})\.[0-9]{1,3}\](\s(.*)){0,}')
self.scrub_regex_1: Pattern = regex.compile(r'(\[.*?\])(\s){0,}(\:){0,1}')
self.scrub_regex_2: Pattern = regex.compile(r'(\d?)(Embed\b)',
flags=regex.IGNORECASe)
self.scrub_regex_3: Pattern = regex.compile(r'\n{2}')
self.scrub_regex_4: Pattern = regex.compile(r'[0-9]\b$')
def scrub_lyrics(self, lyrics: str) -> str: def scrub_lyrics(self, lyrics: str) -> str:
""" """
Lyric Scrub Regex Chain Lyric Scrub Regex Chain
@ -110,10 +118,10 @@ class DataUtils:
Returns: Returns:
str: Regex scrubbed lyrics str: Regex scrubbed lyrics
""" """
lyrics = regex.sub(r'(\[.*?\])(\s){0,}(\:){0,1}', '', lyrics) lyrics = self.scrub_regex_1.sub('', lyrics)
lyrics = regex.sub(r'(\d?)(Embed\b)', '', lyrics, flags=regex.IGNORECASE) lyrics = self.scrub_regex_2.sub('', lyrics, flags=regex.IGNORECASE)
lyrics = regex.sub(r'\n{2}', '\n', lyrics) # Gaps between verses lyrics = self.scrub_regex_3.sub('\n', lyrics) # Gaps between verses
lyrics = regex.sub(r'[0-9]\b$', '', lyrics) lyrics = self.scrub_regex_3.sub('', lyrics)
return lyrics return lyrics
def create_lrc_object(self, lrc_str: str) -> list[dict]: def create_lrc_object(self, lrc_str: str) -> list[dict]:

35
util.py
View File

@ -1,29 +1,36 @@
#!/usr/bin/env python3.12 #!/usr/bin/env python3.12
import logging import logging
from typing import Optional
from fastapi import FastAPI, Response, HTTPException from fastapi import FastAPI, Response, HTTPException
from fastapi.responses import RedirectResponse from fastapi.responses import RedirectResponse
class Utilities: class Utilities:
"""API Utilities""" """
API Utilities
"""
def __init__(self, app: FastAPI, constants): def __init__(self, app: FastAPI, constants):
self.constants = constants self.constants = constants
self.blocked_redirect_uri = "https://codey.lol" self.blocked_redirect_uri = "https://codey.lol"
self.app = app self.app = app
def get_blocked_response(self, path: str | None = None): # pylint: disable=unused-argument def get_blocked_response(self, path: Optional[str] = None):
"""Get Blocked HTTP Response""" """
Get Blocked HTTP Response
"""
logging.error("Rejected request: Blocked") logging.error("Rejected request: Blocked")
return RedirectResponse(url=self.blocked_redirect_uri) return RedirectResponse(url=self.blocked_redirect_uri)
def get_no_endpoint_found(self, path: str | None = None): # pylint: disable=unused-argument def get_no_endpoint_found(self, path: Optional[str] = None):
"""Get 404 Response""" """
Get 404 Response
"""
logging.error("Rejected request: No such endpoint") logging.error("Rejected request: No such endpoint")
raise HTTPException(detail="Unknown endpoint", status_code=404) raise HTTPException(detail="Unknown endpoint", status_code=404)
def check_key(self, path: str, key: str, req_type: int = 0): def check_key(self, path: str, key: str,
req_type: int = 0) -> bool:
""" """
Accepts path as an argument to allow fine tuning access for each API key, not currently in use. Accepts path as an argument to allow fine tuning access for each API key, not currently in use.
""" """
@ -31,19 +38,19 @@ class Utilities:
if not key or not key.startswith("Bearer "): if not key or not key.startswith("Bearer "):
return False return False
key = key.split("Bearer ", maxsplit=1)[1].strip() _key: str = key.split("Bearer ", maxsplit=1)[1].strip()
if not key in self.constants.API_KEYS: if not _key in self.constants.API_KEYS:
return False return False
if req_type == 2: if req_type == 2:
return key.startswith("PRV-") return _key.startswith("PRV-")
elif req_type == 4: elif req_type == 4:
return key.startswith("RAD-") return _key.startswith("RAD-")
if path.lower().startswith("/xc/") and not key.startswith("XC-"): if path.lower().startswith("/xc/")\
return False and not key.startswith("XC-"):
return False
return True return True