diff --git a/.gitignore b/.gitignore index df2918d..17123d5 100644 --- a/.gitignore +++ b/.gitignore @@ -12,7 +12,6 @@ youtube* playlist_creator.py artist_genre_tag.py uv.lock -py.typed pyproject.toml mypy.ini .python-version \ No newline at end of file diff --git a/base.py b/base.py index f19b528..a96dae9 100644 --- a/base.py +++ b/base.py @@ -1,5 +1,6 @@ import importlib import sys + sys.path.insert(0, ".") import logging import asyncio @@ -12,58 +13,57 @@ logger = logging.getLogger() logger.setLevel(logging.INFO) loop = asyncio.get_event_loop() -app = FastAPI(title="codey.lol API", - version="1.0", - contact={ - 'name': 'codey' - }, - redirect_slashes=False, - loop=loop) +app = FastAPI( + title="codey.lol API", + version="1.0", + contact={"name": "codey"}, + redirect_slashes=False, + loop=loop, +) constants = importlib.import_module("constants").Constants() util = importlib.import_module("util").Utilities(app, constants) -origins = [ - "https://codey.lol", - "https://api.codey.lol" -] +origins = ["https://codey.lol", "https://api.codey.lol"] -app.add_middleware(CORSMiddleware, # type: ignore -allow_origins=origins, -allow_credentials=True, -allow_methods=["POST", "GET", "HEAD"], -allow_headers=["*"]) # type: ignore +app.add_middleware( + CORSMiddleware, # type: ignore + allow_origins=origins, + allow_credentials=True, + allow_methods=["POST", "GET", "HEAD"], + allow_headers=["*"], +) # type: ignore """ Blacklisted routes """ + @app.get("/", include_in_schema=False) def disallow_get(): return util.get_blocked_response() + @app.head("/", include_in_schema=False) def base_head(): return + @app.get("/{path}", include_in_schema=False) def disallow_get_any(request: Request, var: Any = None): - path = request.path_params['path'] - if not ( - isinstance(path, str) - and - path.split("/", maxsplit=1) == "widget" - ): + path = request.path_params["path"] + if not (isinstance(path, str) and path.split("/", maxsplit=1) == "widget"): return util.get_blocked_response() else: - logging.info("OK, %s", - path) + logging.info("OK, %s", path) + @app.post("/", include_in_schema=False) def disallow_base_post(): return util.get_blocked_response() + """ End Blacklisted Routes """ @@ -73,20 +73,28 @@ Actionable Routes """ routes: dict = { - 'randmsg': importlib.import_module("endpoints.rand_msg").RandMsg(app, util, constants), - 'transcriptions': importlib.import_module("endpoints.transcriptions").Transcriptions(app, util, constants), - 'lyrics': importlib.import_module("endpoints.lyric_search").LyricSearch(app, util, constants), - 'lastfm': importlib.import_module("endpoints.lastfm").LastFM(app, util, constants), - 'yt': importlib.import_module("endpoints.yt").YT(app, util, constants), - 'karma': importlib.import_module("endpoints.karma").Karma(app, util, constants), - 'radio': importlib.import_module("endpoints.radio").Radio(app, util, constants), - 'mgr': importlib.import_module("endpoints.mgr.mgr_test").Mgr(app, util, constants), + "randmsg": importlib.import_module("endpoints.rand_msg").RandMsg( + app, util, constants + ), + "transcriptions": importlib.import_module( + "endpoints.transcriptions" + ).Transcriptions(app, util, constants), + "lyrics": importlib.import_module("endpoints.lyric_search").LyricSearch( + app, util, constants + ), + "lastfm": importlib.import_module("endpoints.lastfm").LastFM(app, util, constants), + "yt": importlib.import_module("endpoints.yt").YT(app, util, constants), + "karma": importlib.import_module("endpoints.karma").Karma(app, util, constants), + "radio": importlib.import_module("endpoints.radio").Radio(app, util, constants), + "mgr": importlib.import_module("endpoints.mgr.mgr_test").Mgr(app, util, constants), } # Misc endpoint depends on radio endpoint instance -radio_endpoint = routes.get('radio') +radio_endpoint = routes.get("radio") if radio_endpoint: - routes['misc'] = importlib.import_module("endpoints.misc").Misc(app, util, constants, radio_endpoint) + routes["misc"] = importlib.import_module("endpoints.misc").Misc( + app, util, constants, radio_endpoint + ) """ End Actionable Routes @@ -98,5 +106,4 @@ Startup """ redis = redis_cache.RedisCache() -loop.create_task( - redis.create_index()) \ No newline at end of file +loop.create_task(redis.create_index()) diff --git a/endpoints/constructors.py b/endpoints/constructors.py index 88fc85a..c44b2a3 100644 --- a/endpoints/constructors.py +++ b/endpoints/constructors.py @@ -5,6 +5,7 @@ from pydantic import BaseModel Karma """ + class ValidKarmaUpdateRequest(BaseModel): """ Requires authentication @@ -25,35 +26,42 @@ class ValidKarmaRetrievalRequest(BaseModel): keyword: str + class ValidTopKarmaRequest(BaseModel): """ - **n**: Number of top results to return (default: 10) """ + n: Optional[int] = 10 + """ LastFM """ + class LastFMException(Exception): pass + class ValidArtistSearchRequest(BaseModel): """ - **a**: artist name """ a: str - + model_config = { - "json_schema_extra": { + "json_schema_extra": { "examples": [ { "a": "eminem", - }] + } + ] } } - + + class ValidAlbumDetailRequest(BaseModel): """ - **a**: artist name @@ -64,15 +72,17 @@ class ValidAlbumDetailRequest(BaseModel): release: str model_config = { - "json_schema_extra": { + "json_schema_extra": { "examples": [ { "a": "eminem", "release": "houdini", - }] + } + ] } } - + + class ValidTrackInfoRequest(BaseModel): """ - **a**: artist name @@ -81,21 +91,24 @@ class ValidTrackInfoRequest(BaseModel): a: str t: str - + model_config = { - "json_schema_extra": { + "json_schema_extra": { "examples": [ { "a": "eminem", "t": "rap god", - }] + } + ] } - } - + } + + """ Rand Msg """ + class RandMsgRequest(BaseModel): """ - **short**: Short randmsg? @@ -103,10 +116,12 @@ class RandMsgRequest(BaseModel): short: Optional[bool] = False + """ YT """ + class ValidYTSearchRequest(BaseModel): """ - **t**: title to search @@ -114,10 +129,12 @@ class ValidYTSearchRequest(BaseModel): t: str = "rick astley - never gonna give you up" + """ XC """ + class ValidXCRequest(BaseModel): """ - **key**: valid XC API key @@ -129,12 +146,14 @@ class ValidXCRequest(BaseModel): key: str bid: int cmd: str - data: Optional[dict] + data: Optional[dict] + """ Transcriptions """ + class ValidShowEpisodeListRequest(BaseModel): """ - **s**: show id @@ -142,6 +161,7 @@ class ValidShowEpisodeListRequest(BaseModel): s: int + class ValidShowEpisodeLineRequest(BaseModel): """ - **s**: show id @@ -151,10 +171,12 @@ class ValidShowEpisodeLineRequest(BaseModel): s: int e: int + """ Lyric Search """ + class ValidLyricRequest(BaseModel): """ - **a**: artist @@ -167,7 +189,7 @@ class ValidLyricRequest(BaseModel): - **excluded_sources**: sources to exclude (new only) """ - a: Optional[str] = None + a: Optional[str] = None s: Optional[str] = None t: Optional[str] = None sub: Optional[str] = None @@ -178,32 +200,37 @@ class ValidLyricRequest(BaseModel): model_config = { "json_schema_extra": { - "examples": [ - { - "a": "eminem", - "s": "rap god", - "src": "WEB", - "extra": True, - "lrc": False, - "excluded_sources": [], - }] - } + "examples": [ + { + "a": "eminem", + "s": "rap god", + "src": "WEB", + "extra": True, + "lrc": False, + "excluded_sources": [], + } + ] } + } + - class ValidTypeAheadRequest(BaseModel): """ - **query**: query string """ + query: str + """ Radio """ + class RadioException(Exception): pass + class ValidRadioSongRequest(BaseModel): """ - **key**: API Key @@ -212,55 +239,65 @@ class ValidRadioSongRequest(BaseModel): - **artistsong**: may be used IN PLACE OF artist/song to perform a combined/string search in the format "artist - song" - **alsoSkip**: Whether to skip immediately to this track [not implemented] """ + key: str artist: Optional[str] = None song: Optional[str] = None artistsong: Optional[str] = None alsoSkip: Optional[bool] = False - + + class ValidRadioTypeaheadRequest(BaseModel): """ - **query**: Typeahead query """ + query: str - - + + class ValidRadioQueueGetRequest(BaseModel): """ - **key**: API key (optional, needed if specifying a non-default limit) - **limit**: optional, default: 15k """ - + key: Optional[str] = None limit: Optional[int] = 15_000 - + + class ValidRadioNextRequest(BaseModel): """ - **key**: API Key - **skipTo**: UUID to skip to [optional] """ + key: str skipTo: Optional[str] = None - + + class ValidRadioReshuffleRequest(ValidRadioNextRequest): """ - **key**: API Key """ - + + class ValidRadioQueueShiftRequest(BaseModel): """ - **key**: API Key - - **uuid**: UUID to shift + - **uuid**: UUID to shift - **next**: Play next if true, immediately if false, default False """ + key: str uuid: str next: Optional[bool] = False - + + class ValidRadioQueueRemovalRequest(BaseModel): """ - **key**: API Key - - **uuid**: UUID to remove + - **uuid**: UUID to remove """ + key: str - uuid: str \ No newline at end of file + uuid: str diff --git a/endpoints/karma.py b/endpoints/karma.py index e95d2b1..12bb501 100644 --- a/endpoints/karma.py +++ b/endpoints/karma.py @@ -7,16 +7,22 @@ import aiosqlite as sqlite3 from typing import LiteralString, Optional, Union from fastapi import FastAPI, Request, HTTPException from fastapi.responses import JSONResponse -from .constructors import (ValidTopKarmaRequest, ValidKarmaRetrievalRequest, - ValidKarmaUpdateRequest) +from .constructors import ( + ValidTopKarmaRequest, + ValidKarmaRetrievalRequest, + ValidKarmaUpdateRequest, +) + class KarmaDB: """Karma DB Util""" - def __init__(self) -> None: - self.db_path: LiteralString = os.path.join("/", "usr", "local", "share", - "sqlite_dbs", "karma.db") - async def get_karma(self, keyword: str) -> Union[int, dict]: + def __init__(self) -> None: + self.db_path: LiteralString = os.path.join( + "/", "usr", "local", "share", "sqlite_dbs", "karma.db" + ) + + async def get_karma(self, keyword: str) -> Union[int, dict]: """Get Karma Value for Keyword Args: keyword (str): The keyword to search @@ -24,16 +30,18 @@ class KarmaDB: Union[int, dict] """ async with sqlite3.connect(self.db_path, timeout=2) as db_conn: - async with await db_conn.execute("SELECT score FROM karma WHERE keyword LIKE ? LIMIT 1", (keyword,)) as db_cursor: + async with await db_conn.execute( + "SELECT score FROM karma WHERE keyword LIKE ? LIMIT 1", (keyword,) + ) as db_cursor: try: (score,) = await db_cursor.fetchone() return score except TypeError: return { - 'err': True, - 'errorText': f'No records for {keyword}', + "err": True, + "errorText": f"No records for {keyword}", } - + async def get_top(self, n: Optional[int] = 10) -> Optional[list[tuple]]: """ Get Top n=10 Karma Entries @@ -44,14 +52,17 @@ class KarmaDB: """ try: async with sqlite3.connect(self.db_path, timeout=2) as db_conn: - async with await db_conn.execute("SELECT keyword, score FROM karma ORDER BY score DESC LIMIT ?", (n,)) as db_cursor: + async with await db_conn.execute( + "SELECT keyword, score FROM karma ORDER BY score DESC LIMIT ?", (n,) + ) as db_cursor: return await db_cursor.fetchall() except: traceback.print_exc() return None - - async def update_karma(self, granter: str, keyword: str, - flag: int) -> Optional[bool]: + + async def update_karma( + self, granter: str, keyword: str, flag: int + ) -> Optional[bool]: """ Update Karma for Keyword Args: @@ -61,42 +72,71 @@ class KarmaDB: Returns: Optional[bool] """ - + if not flag in [0, 1]: return None - + modifier: str = "score + 1" if not flag else "score - 1" - query: str = f"UPDATE karma SET score = {modifier}, last_change = ? WHERE keyword LIKE ?" - new_keyword_query: str = "INSERT INTO karma(keyword, score, last_change) VALUES(?, ?, ?)" + query: str = ( + f"UPDATE karma SET score = {modifier}, last_change = ? WHERE keyword LIKE ?" + ) + new_keyword_query: str = ( + "INSERT INTO karma(keyword, score, last_change) VALUES(?, ?, ?)" + ) friendly_flag: str = "++" if not flag else "--" - audit_message: str = f"{granter} adjusted karma for {keyword} @ {datetime.datetime.now().isoformat()}: {friendly_flag}" - audit_query: str = "INSERT INTO karma_audit(impacted_keyword, comment) VALUES(?, ?)" + audit_message: str = ( + f"{granter} adjusted karma for {keyword} @ {datetime.datetime.now().isoformat()}: {friendly_flag}" + ) + audit_query: str = ( + "INSERT INTO karma_audit(impacted_keyword, comment) VALUES(?, ?)" + ) now: int = int(time.time()) logging.debug("Audit message: %s{audit_message}\nKeyword: %s{keyword}") async with sqlite3.connect(self.db_path, timeout=2) as db_conn: - async with await db_conn.execute(audit_query, (keyword, audit_message,)) as db_cursor: - await db_conn.commit() - async with await db_conn.execute(query, (now, keyword,)) as db_cursor: + async with await db_conn.execute( + audit_query, + ( + keyword, + audit_message, + ), + ) as db_cursor: + await db_conn.commit() + async with await db_conn.execute( + query, + ( + now, + keyword, + ), + ) as db_cursor: if db_cursor.rowcount: await db_conn.commit() return True if db_cursor.rowcount < 1: # Keyword does not already exist await db_cursor.close() new_val = 1 if not flag else -1 - async with await db_conn.execute(new_keyword_query, (keyword, new_val, now,)) as db_cursor: + async with await db_conn.execute( + new_keyword_query, + ( + keyword, + new_val, + now, + ), + ) as db_cursor: if db_cursor.rowcount >= 1: await db_conn.commit() return True else: return False return False - + + class Karma(FastAPI): """ Karma Endpoints - """ + """ + def __init__(self, app: FastAPI, util, constants) -> None: self.app: FastAPI = app self.util = util @@ -107,86 +147,111 @@ class Karma(FastAPI): "karma/get": self.get_karma_handler, "karma/modify": self.modify_karma_handler, "karma/top": self.top_karma_handler, - } + } for endpoint, handler in self.endpoints.items(): - app.add_api_route(f"/{endpoint}", handler, methods=["POST"], - include_in_schema=True) + app.add_api_route( + f"/{endpoint}", handler, methods=["POST"], include_in_schema=True + ) - - async def top_karma_handler(self, request: Request, - data: Optional[ValidTopKarmaRequest] = None) -> JSONResponse: + async def top_karma_handler( + self, request: Request, data: Optional[ValidTopKarmaRequest] = None + ) -> JSONResponse: """ Get top keywords for karma - **n**: Number of top results to return (default: 10) """ - if not self.util.check_key(request.url.path, request.headers.get('X-Authd-With')): + if not self.util.check_key( + request.url.path, request.headers.get("X-Authd-With") + ): raise HTTPException(status_code=403, detail="Unauthorized") - + n: int = 10 if data and data.n: n = int(data.n) - try: top10: Optional[list[tuple]] = await self.db.get_top(n=n) if not top10: - return JSONResponse(status_code=500, content={ - 'err': True, - 'errorText': 'General failure', - }) + return JSONResponse( + status_code=500, + content={ + "err": True, + "errorText": "General failure", + }, + ) return JSONResponse(content=top10) except: traceback.print_exc() - return JSONResponse(status_code=500, content={ - 'err': True, - 'errorText': 'Exception occurred.', - }) - - async def get_karma_handler(self, data: ValidKarmaRetrievalRequest, - request: Request) -> JSONResponse: + return JSONResponse( + status_code=500, + content={ + "err": True, + "errorText": "Exception occurred.", + }, + ) + + async def get_karma_handler( + self, data: ValidKarmaRetrievalRequest, request: Request + ) -> JSONResponse: """ Get current karma value - **keyword**: Keyword to retrieve karma value for """ - - if not self.util.check_key(request.url.path, request.headers.get('X-Authd-With')): - raise HTTPException(status_code=403, detail="Unauthorized") + + if not self.util.check_key( + request.url.path, request.headers.get("X-Authd-With") + ): + raise HTTPException(status_code=403, detail="Unauthorized") keyword: str = data.keyword try: count: Union[int, dict] = await self.db.get_karma(keyword) - return JSONResponse(content={ - 'keyword': keyword, - 'count': count, - }) + return JSONResponse( + content={ + "keyword": keyword, + "count": count, + } + ) except: traceback.print_exc() - return JSONResponse(status_code=500, content={ - 'err': True, - 'errorText': "Exception occurred.", - }) - - async def modify_karma_handler(self, data: ValidKarmaUpdateRequest, - request: Request) -> JSONResponse: + return JSONResponse( + status_code=500, + content={ + "err": True, + "errorText": "Exception occurred.", + }, + ) + + async def modify_karma_handler( + self, data: ValidKarmaUpdateRequest, request: Request + ) -> JSONResponse: """ Update karma count - **granter**: User who granted the karma - **keyword**: The keyword to modify - - **flag**: 0 to decrement (--), 1 to increment (++) + - **flag**: 0 to decrement (--), 1 to increment (++) """ - if not self.util.check_key(request.url.path, request.headers.get('X-Authd-With'), 2): + if not self.util.check_key( + request.url.path, request.headers.get("X-Authd-With"), 2 + ): raise HTTPException(status_code=403, detail="Unauthorized") - + if not data.flag in [0, 1]: - return JSONResponse(status_code=500, content={ - 'err': True, - 'errorText': 'Invalid request', - }) - - return JSONResponse(content={ - 'success': await self.db.update_karma(data.granter, - data.keyword, data.flag) - }) \ No newline at end of file + return JSONResponse( + status_code=500, + content={ + "err": True, + "errorText": "Invalid request", + }, + ) + + return JSONResponse( + content={ + "success": await self.db.update_karma( + data.granter, data.keyword, data.flag + ) + } + ) diff --git a/endpoints/lastfm.py b/endpoints/lastfm.py index 6925c9a..afcce1f 100644 --- a/endpoints/lastfm.py +++ b/endpoints/lastfm.py @@ -3,13 +3,18 @@ import traceback from typing import Optional, Union from fastapi import FastAPI from fastapi.responses import JSONResponse -from .constructors import (ValidArtistSearchRequest, ValidAlbumDetailRequest, - ValidTrackInfoRequest, LastFMException) - +from .constructors import ( + ValidArtistSearchRequest, + ValidAlbumDetailRequest, + ValidTrackInfoRequest, + LastFMException, +) + + class LastFM(FastAPI): - """Last.FM Endpoints""" - def __init__(self, app: FastAPI, - util, constants) -> None: + """Last.FM Endpoints""" + + def __init__(self, app: FastAPI, util, constants) -> None: self.app: FastAPI = app self.util = util self.constants = constants @@ -21,72 +26,94 @@ class LastFM(FastAPI): "lastfm/get_release": self.release_detail_handler, "lastfm/get_release_tracklist": self.release_tracklist_handler, "lastfm/get_track_info": self.track_info_handler, - #tbd - } + # tbd + } for endpoint, handler in self.endpoints.items(): - app.add_api_route(f"/{endpoint}", handler, methods=["POST"], - include_in_schema=True) - - async def artist_by_name_handler(self, data: ValidArtistSearchRequest) -> JSONResponse: + app.add_api_route( + f"/{endpoint}", handler, methods=["POST"], include_in_schema=True + ) + + async def artist_by_name_handler( + self, data: ValidArtistSearchRequest + ) -> JSONResponse: """ Get artist info - **a**: Artist to search """ artist: Optional[str] = data.a.strip() if not artist: - return JSONResponse(content={ - 'err': True, - 'errorText': 'No artist specified', - }) - + return JSONResponse( + content={ + "err": True, + "errorText": "No artist specified", + } + ) + artist_result = await self.lastfm.search_artist(artist=artist) - if not artist_result or not artist_result.get('bio')\ - or "err" in artist_result.keys(): - return JSONResponse(status_code=500, content={ - 'err': True, - 'errorText': 'Search failed (no results?)', - }) - - return JSONResponse(content={ - 'success': True, - 'result': artist_result, - }) - - async def artist_album_handler(self, data: ValidArtistSearchRequest) -> JSONResponse: + if ( + not artist_result + or not artist_result.get("bio") + or "err" in artist_result.keys() + ): + return JSONResponse( + status_code=500, + content={ + "err": True, + "errorText": "Search failed (no results?)", + }, + ) + + return JSONResponse( + content={ + "success": True, + "result": artist_result, + } + ) + + async def artist_album_handler( + self, data: ValidArtistSearchRequest + ) -> JSONResponse: """ Get artist's albums/releases - **a**: Artist to search """ artist: str = data.a.strip() if not artist: - return JSONResponse(status_code=500, content={ - 'err': True, - 'errorText': 'Invalid request: No artist specified', - }) - - album_result: Union[dict, list[dict]] = await self.lastfm.get_artist_albums(artist=artist) + return JSONResponse( + status_code=500, + content={ + "err": True, + "errorText": "Invalid request: No artist specified", + }, + ) + + album_result: Union[dict, list[dict]] = await self.lastfm.get_artist_albums( + artist=artist + ) if isinstance(album_result, dict): - return JSONResponse(status_code=500, content={ - 'err': True, - 'errorText': 'General failure.', - }) + return JSONResponse( + status_code=500, + content={ + "err": True, + "errorText": "General failure.", + }, + ) album_result_out: list = [] seen_release_titles: list = [] - + for release in album_result: - release_title: str = release.get('title', 'Unknown') + release_title: str = release.get("title", "Unknown") if release_title.lower() in seen_release_titles: continue seen_release_titles.append(release_title.lower()) album_result_out.append(release) - return JSONResponse(content={ - 'success': True, - 'result': album_result_out - }) + return JSONResponse(content={"success": True, "result": album_result_out}) - async def release_detail_handler(self, data: ValidAlbumDetailRequest) -> JSONResponse: + async def release_detail_handler( + self, data: ValidAlbumDetailRequest + ) -> JSONResponse: """ Get details of a particular release by an artist - **a**: Artist to search @@ -94,85 +121,105 @@ class LastFM(FastAPI): """ artist: str = data.a.strip() release: str = data.release.strip() - + if not artist or not release: - return JSONResponse(status_code=500, content={ - 'err': True, - 'errorText': 'Invalid request', - }) - + return JSONResponse( + status_code=500, + content={ + "err": True, + "errorText": "Invalid request", + }, + ) + release_result = await self.lastfm.get_release(artist=artist, album=release) ret_obj = { - 'id': release_result.get('id'), - 'artists': release_result.get('artists'), - 'title': release_result.get('title'), - 'summary': release_result.get('summary'), - 'tracks': release_result.get('tracks'), - } + "id": release_result.get("id"), + "artists": release_result.get("artists"), + "title": release_result.get("title"), + "summary": release_result.get("summary"), + "tracks": release_result.get("tracks"), + } - return JSONResponse(content={ - 'success': True, - 'result': ret_obj, - }) - - async def release_tracklist_handler(self, data: ValidAlbumDetailRequest) -> JSONResponse: + return JSONResponse( + content={ + "success": True, + "result": ret_obj, + } + ) + + async def release_tracklist_handler( + self, data: ValidAlbumDetailRequest + ) -> JSONResponse: """ Get track list for a particular release by an artist - **a**: Artist to search - - **release**: Release title to search + - **release**: Release title to search """ artist: str = data.a.strip() release: str = data.release.strip() - + if not artist or not release: - return JSONResponse(status_code=500, content={ - 'err': True, - 'errorText': 'Invalid request', - }) - - tracklist_result: dict = await self.lastfm.get_album_tracklist(artist=artist, album=release) - return JSONResponse(content={ - 'success': True, - 'id': tracklist_result.get('id'), - 'artists': tracklist_result.get('artists'), - 'title': tracklist_result.get('title'), - 'summary': tracklist_result.get('summary'), - 'tracks': tracklist_result.get('tracks'), - }) + return JSONResponse( + status_code=500, + content={ + "err": True, + "errorText": "Invalid request", + }, + ) + + tracklist_result: dict = await self.lastfm.get_album_tracklist( + artist=artist, album=release + ) + return JSONResponse( + content={ + "success": True, + "id": tracklist_result.get("id"), + "artists": tracklist_result.get("artists"), + "title": tracklist_result.get("title"), + "summary": tracklist_result.get("summary"), + "tracks": tracklist_result.get("tracks"), + } + ) async def track_info_handler(self, data: ValidTrackInfoRequest) -> JSONResponse: """ - Get track info from Last.FM given an artist/track - - **a**: Artist to search - - **t**: Track title to search - """ + Get track info from Last.FM given an artist/track + - **a**: Artist to search + - **t**: Track title to search + """ try: artist: str = data.a track: str = data.t - + if not artist or not track: - return JSONResponse(status_code=500, content={ - 'err': True, - 'errorText': 'Invalid request' - }) - - track_info_result: Optional[dict] = await self.lastfm.get_track_info(artist=artist, - track=track) + return JSONResponse( + status_code=500, + content={"err": True, "errorText": "Invalid request"}, + ) + + track_info_result: Optional[dict] = await self.lastfm.get_track_info( + artist=artist, track=track + ) if not track_info_result: - return JSONResponse(status_code=500, content={ - 'err': True, - 'errorText': 'Not found.', - }) + return JSONResponse( + status_code=500, + content={ + "err": True, + "errorText": "Not found.", + }, + ) if "err" in track_info_result: - raise LastFMException("Unknown error occurred: %s", - track_info_result.get('errorText', '??')) - return JSONResponse(content={ - 'success': True, - 'result': track_info_result - }) + raise LastFMException( + "Unknown error occurred: %s", + track_info_result.get("errorText", "??"), + ) + return JSONResponse(content={"success": True, "result": track_info_result}) except: traceback.print_exc() - return JSONResponse(status_code=500, content={ - 'err': True, - 'errorText': 'General error', - }) + return JSONResponse( + status_code=500, + content={ + "err": True, + "errorText": "General error", + }, + ) diff --git a/endpoints/lyric_search.py b/endpoints/lyric_search.py index 9e89027..30d975c 100644 --- a/endpoints/lyric_search.py +++ b/endpoints/lyric_search.py @@ -9,23 +9,25 @@ from typing import LiteralString, Optional, Union, Iterable from regex import Pattern from .constructors import ValidTypeAheadRequest, ValidLyricRequest from lyric_search.constructors import LyricsResult -from lyric_search.sources import aggregate +from lyric_search.sources import aggregate from lyric_search import notifier + class CacheUtils: """ Lyrics Cache DB Utils """ + def __init__(self) -> None: - self.lyrics_db_path: LiteralString = os.path.join("/usr/local/share", - "sqlite_dbs", "cached_lyrics.db") - + self.lyrics_db_path: LiteralString = os.path.join( + "/usr/local/share", "sqlite_dbs", "cached_lyrics.db" + ) + async def check_typeahead(self, query: str) -> Optional[list[str]]: """Lyric Search Typeahead DB Handler""" if not query: return None - async with sqlite3.connect(self.lyrics_db_path, - timeout=1) as _db: + async with sqlite3.connect(self.lyrics_db_path, timeout=1) as _db: _db.row_factory = sqlite3.Row db_query: str = """SELECT DISTINCT(LOWER(TRIM(artist) || " - " || TRIM(song))),\ (TRIM(artist) || " - " || TRIM(song)) as ret FROM lyrics WHERE\ @@ -33,9 +35,7 @@ class CacheUtils: db_params: tuple[str] = (f"%%%{query}%%%",) async with _db.execute(db_query, db_params) as _cursor: result: Iterable[sqlite3.Row] = await _cursor.fetchall() - out_result = [ - str(r['ret']) for r in result - ] + out_result = [str(r["ret"]) for r in result] return out_result @@ -43,18 +43,17 @@ class LyricSearch(FastAPI): """ Lyric Search Endpoint """ - def __init__(self, app: FastAPI, - util, constants) -> None: + + def __init__(self, app: FastAPI, util, constants) -> None: self.app: FastAPI = app self.util = util self.constants = constants self.cache_utils = CacheUtils() self.notifier = notifier.DiscordNotifier() - self.endpoints: dict = { "typeahead/lyrics": self.typeahead_handler, - "lyric_search": self.lyric_search_handler, # Preserving old endpoint path temporarily + "lyric_search": self.lyric_search_handler, # Preserving old endpoint path temporarily "lyric/search": self.lyric_search_handler, } @@ -66,11 +65,18 @@ class LyricSearch(FastAPI): "IRC-SHARED", ] - self.lrc_regex: Pattern = regex.compile(r'\[([0-9]{2}:[0-9]{2})\.[0-9]{1,3}\](\s(.*)){0,}') + self.lrc_regex: Pattern = regex.compile( + r"\[([0-9]{2}:[0-9]{2})\.[0-9]{1,3}\](\s(.*)){0,}" + ) for endpoint, handler in self.endpoints.items(): _schema_include = endpoint in ["lyric/search"] - app.add_api_route(f"/{endpoint}", handler, methods=["POST"], include_in_schema=_schema_include) + app.add_api_route( + f"/{endpoint}", + handler, + methods=["POST"], + include_in_schema=_schema_include, + ) async def typeahead_handler(self, data: ValidTypeAheadRequest) -> JSONResponse: """ @@ -78,104 +84,133 @@ class LyricSearch(FastAPI): - **query**: Typeahead query """ if not isinstance(data.query, str): - return JSONResponse(status_code=500, content={ - 'err': True, - 'errorText': 'Invalid request.', - }) - typeahead: Optional[list[str]] = await self.cache_utils.check_typeahead(data.query) + return JSONResponse( + status_code=500, + content={ + "err": True, + "errorText": "Invalid request.", + }, + ) + typeahead: Optional[list[str]] = await self.cache_utils.check_typeahead( + data.query + ) if not typeahead: return JSONResponse(content=[]) return JSONResponse(content=typeahead) - async def lyric_search_handler(self, data: ValidLyricRequest) -> JSONResponse: """ Search for lyrics - **a**: artist - **s**: song - - **t**: track (artist and song combined) [used only if a & s are not used] + - **t**: track (artist and song combined) [used only if a & s are not used] - **extra**: include extra details in response [optional, default: false] - - **lrc**: Request LRCs? + - **lrc**: Request LRCs? - **sub**: text to search within lyrics, if found lyrics will begin at found verse [optional, default: none] - **src**: the script/utility which initiated the request - **excluded_sources**: sources to exclude [optional, default: none] """ if (not data.a or not data.s) and not data.t or not data.src: raise HTTPException(detail="Invalid request", status_code=500) - + if data.src.upper() not in self.acceptable_request_sources: - await self.notifier.send(f"ERROR @ {__file__.rsplit("/", maxsplit=1)[-1]}", - f"Unknown request source: {data.src}") - return JSONResponse(status_code=500, content={ - 'err': True, - 'errorText': f'Unknown request source: {data.src}', - }) - + await self.notifier.send( + f"ERROR @ {__file__.rsplit("/", maxsplit=1)[-1]}", + f"Unknown request source: {data.src}", + ) + return JSONResponse( + status_code=500, + content={ + "err": True, + "errorText": f"Unknown request source: {data.src}", + }, + ) + if not data.t: search_artist: Optional[str] = data.a search_song: Optional[str] = data.s else: t_split: tuple = tuple(data.t.split(" - ", maxsplit=1)) (search_artist, search_song) = t_split - + if search_artist and search_song: - search_artist = str(self.constants.DOUBLE_SPACE_REGEX.sub(" ", search_artist.strip())) - search_song = str(self.constants.DOUBLE_SPACE_REGEX.sub(" ", search_song.strip())) + search_artist = str( + self.constants.DOUBLE_SPACE_REGEX.sub(" ", search_artist.strip()) + ) + search_song = str( + self.constants.DOUBLE_SPACE_REGEX.sub(" ", search_song.strip()) + ) search_artist = urllib.parse.unquote(search_artist) search_song = urllib.parse.unquote(search_song) if not isinstance(search_artist, str) or not isinstance(search_song, str): - return JSONResponse(status_code=500, content={ - 'err': True, - 'errorText': 'Invalid request', - }) + return JSONResponse( + status_code=500, + content={ + "err": True, + "errorText": "Invalid request", + }, + ) excluded_sources: Optional[list] = data.excluded_sources aggregate_search = aggregate.Aggregate(exclude_methods=excluded_sources) plain_lyrics: bool = not data.lrc - result: Optional[Union[LyricsResult, dict]] = await aggregate_search.search(search_artist, search_song, plain_lyrics) - + result: Optional[Union[LyricsResult, dict]] = await aggregate_search.search( + search_artist, search_song, plain_lyrics + ) + if not result: - return JSONResponse(content={ - 'err': True, - 'errorText': 'Sources exhausted, lyrics not located.', - }) - + return JSONResponse( + content={ + "err": True, + "errorText": "Sources exhausted, lyrics not located.", + } + ) + result = vars(result) - + if data.sub and not data.lrc: seeked_found_line: Optional[int] = None - lyric_lines: list[str] = result['lyrics'].strip().split(" / ") + lyric_lines: list[str] = result["lyrics"].strip().split(" / ") for i, line in enumerate(lyric_lines): - line = regex.sub(r'\u2064', '', line.strip()) + line = regex.sub(r"\u2064", "", line.strip()) if data.sub.strip().lower() in line.strip().lower(): seeked_found_line = i - logging.debug("Found %s at %s, match for %s!", - line, seeked_found_line, data.sub) # REMOVEME: DEBUG + logging.debug( + "Found %s at %s, match for %s!", + line, + seeked_found_line, + data.sub, + ) # REMOVEME: DEBUG break if not seeked_found_line: - return JSONResponse(status_code=500, content={ - 'err': True, - 'errorText': 'Seek (a.k.a. subsearch) failed.', - 'failed_seek': True, - }) - result['lyrics'] = " / ".join(lyric_lines[seeked_found_line:]) - - result['confidence'] = int(result['confidence']) - result['time'] = f'{float(result['time']):.4f}' - + return JSONResponse( + status_code=500, + content={ + "err": True, + "errorText": "Seek (a.k.a. subsearch) failed.", + "failed_seek": True, + }, + ) + result["lyrics"] = " / ".join(lyric_lines[seeked_found_line:]) + + result["confidence"] = int(result["confidence"]) + result["time"] = f"{float(result['time']):.4f}" + if plain_lyrics: - result['lyrics'] = regex.sub(r'(\s/\s|\n)', '
', result['lyrics']).strip() + result["lyrics"] = regex.sub( + r"(\s/\s|\n)", "
", result["lyrics"] + ).strip() else: # Swap lyrics key for 'lrc' - result['lrc'] = result['lyrics'] - result.pop('lyrics') + result["lrc"] = result["lyrics"] + result.pop("lyrics") - if "cache" in result['src']: - result['from_cache'] = True + if "cache" in result["src"]: + result["from_cache"] = True if not data.extra: - result.pop('src') - - return JSONResponse(content=result) \ No newline at end of file + result.pop("src") + + return JSONResponse(content=result) diff --git a/endpoints/misc.py b/endpoints/misc.py index eb5a197..46928ea 100644 --- a/endpoints/misc.py +++ b/endpoints/misc.py @@ -2,20 +2,18 @@ import logging import time import os from typing import Optional, Annotated -from fastapi import ( - FastAPI, Request, UploadFile, - Response, HTTPException, Form -) +from fastapi import FastAPI, Request, UploadFile, Response, HTTPException, Form from fastapi.responses import JSONResponse import redis.asyncio as redis from lyric_search.sources import private, cache as LyricsCache, redis_cache + class Misc(FastAPI): """ Misc Endpoints """ - def __init__(self, app: FastAPI, my_util, - constants, radio) -> None: + + def __init__(self, app: FastAPI, my_util, constants, radio) -> None: self.app: FastAPI = app self.util = my_util self.constants = constants @@ -28,47 +26,55 @@ class Misc(FastAPI): "widget/redis": self.homepage_redis_widget, "widget/sqlite": self.homepage_sqlite_widget, "widget/lyrics": self.homepage_lyrics_widget, - "widget/radio": self.homepage_radio_widget, + "widget/radio": self.homepage_radio_widget, "misc/get_activity_image": self.get_activity_image, } - + for endpoint, handler in self.endpoints.items(): - app.add_api_route(f"/{endpoint}", handler, methods=["GET"], - include_in_schema=True) - - app.add_api_route("/misc/upload_activity_image", - self.upload_activity_image, methods=["POST"]) - - async def upload_activity_image(self, - image: UploadFile, - key: Annotated[str, Form()], request: Request) -> Response: - if not key or not isinstance(key, str)\ - or not self.util.check_key(path=request.url.path, req_type=2, key=key): - raise HTTPException(status_code=403, detail="Unauthorized") + app.add_api_route( + f"/{endpoint}", handler, methods=["GET"], include_in_schema=True + ) + + app.add_api_route( + "/misc/upload_activity_image", self.upload_activity_image, methods=["POST"] + ) + + async def upload_activity_image( + self, image: UploadFile, key: Annotated[str, Form()], request: Request + ) -> Response: + if ( + not key + or not isinstance(key, str) + or not self.util.check_key(path=request.url.path, req_type=2, key=key) + ): + raise HTTPException(status_code=403, detail="Unauthorized") if not image: - return JSONResponse(status_code=500, content={ - 'err': True, - 'errorText': 'Invalid request', - }) + return JSONResponse( + status_code=500, + content={ + "err": True, + "errorText": "Invalid request", + }, + ) self.activity_image = await image.read() - return JSONResponse(content={ - 'success': True, - }) - + return JSONResponse( + content={ + "success": True, + } + ) + async def get_activity_image(self, request: Request) -> Response: if isinstance(self.activity_image, bytes): - return Response(content=self.activity_image, - media_type="image/png") - - + return Response(content=self.activity_image, media_type="image/png") + # Fallback - fallback_path = os.path.join("/var/www/codey.lol/public", - "images", "plex_placeholder.png") - - with open(fallback_path, 'rb') as f: - return Response(content=f.read(), - media_type="image/png") - + fallback_path = os.path.join( + "/var/www/codey.lol/public", "images", "plex_placeholder.png" + ) + + with open(fallback_path, "rb") as f: + return Response(content=f.read(), media_type="image/png") + async def get_radio_np(self) -> tuple[str, str, str]: """ Get radio now playing @@ -77,76 +83,94 @@ class Misc(FastAPI): Returns: str: Radio now playing in artist - song format """ - + np: dict = self.radio.radio_util.now_playing - artistsong: str = np.get('artistsong', 'N/A - N/A') - album: str = np.get('album', 'N/A') - genre: str = np.get('genre', 'N/A') + artistsong: str = np.get("artistsong", "N/A - N/A") + album: str = np.get("album", "N/A") + genre: str = np.get("genre", "N/A") return (artistsong, album, genre) - - + async def homepage_redis_widget(self) -> JSONResponse: """ Homepage Redis Widget Handler """ # Measure response time w/ test lyric search - time_start: float = time.time() # Start time for response_time - test_lyrics_result = await self.redis_client.ft().search("@artist: test @song: test") + time_start: float = time.time() # Start time for response_time + test_lyrics_result = await self.redis_client.ft().search( + "@artist: test @song: test" + ) time_end: float = time.time() # End response time test total_keys = await self.redis_client.dbsize() - response_time: float = time_end - time_start - (_, ci_keys) = await self.redis_client.scan(cursor=0, match="ci_session*", count=10000000) + response_time: float = time_end - time_start + (_, ci_keys) = await self.redis_client.scan( + cursor=0, match="ci_session*", count=10000000 + ) num_ci_keys = len(ci_keys) index_info = await self.redis_client.ft().info() - indexed_lyrics: int = index_info.get('num_docs') - return JSONResponse(content={ - 'responseTime': round(response_time, 7), - 'storedKeys': total_keys, - 'indexedLyrics': indexed_lyrics, - 'sessions': num_ci_keys, - }) - + indexed_lyrics: int = index_info.get("num_docs") + return JSONResponse( + content={ + "responseTime": round(response_time, 7), + "storedKeys": total_keys, + "indexedLyrics": indexed_lyrics, + "sessions": num_ci_keys, + } + ) + async def homepage_sqlite_widget(self) -> JSONResponse: """ Homepage SQLite Widget Handler - """ + """ row_count: int = await self.lyr_cache.sqlite_rowcount() distinct_artists: int = await self.lyr_cache.sqlite_distinct("artist") lyrics_length: int = await self.lyr_cache.sqlite_lyrics_length() - return JSONResponse(content={ - 'storedRows': row_count, - 'distinctArtists': distinct_artists, - 'lyricsLength': lyrics_length, - }) - + return JSONResponse( + content={ + "storedRows": row_count, + "distinctArtists": distinct_artists, + "lyricsLength": lyrics_length, + } + ) + async def homepage_lyrics_widget(self) -> JSONResponse: """ Homepage Lyrics Widget Handler """ found_counts: Optional[dict] = await self.redis_cache.get_found_counts() if not isinstance(found_counts, dict): - logging.info("DEBUG: Type of found counts from redis: %s\nContents: %s", - type(found_counts), found_counts) - return JSONResponse(status_code=500, content={ - 'err': True, - 'errorText': 'General failure.', - }) + logging.info( + "DEBUG: Type of found counts from redis: %s\nContents: %s", + type(found_counts), + found_counts, + ) + return JSONResponse( + status_code=500, + content={ + "err": True, + "errorText": "General failure.", + }, + ) return JSONResponse(content=found_counts) - + async def homepage_radio_widget(self) -> JSONResponse: """ Homepage Radio Widget Handler """ radio_np: tuple = await self.get_radio_np() if not radio_np: - return JSONResponse(status_code=500, content={ - 'err': True, - 'errorText': 'General failure.', - }) + return JSONResponse( + status_code=500, + content={ + "err": True, + "errorText": "General failure.", + }, + ) (artistsong, album, genre) = radio_np - return JSONResponse(content={ - 'now_playing': artistsong, - 'album': album, - 'genre': genre, - }) \ No newline at end of file + return JSONResponse( + content={ + "now_playing": artistsong, + "album": album, + "genre": genre, + } + ) diff --git a/endpoints/radio.py b/endpoints/radio.py index 226e525..098499f 100644 --- a/endpoints/radio.py +++ b/endpoints/radio.py @@ -4,21 +4,26 @@ import time import random import asyncio from utils import radio_util -from .constructors import (ValidRadioNextRequest, ValidRadioReshuffleRequest, - ValidRadioQueueShiftRequest, ValidRadioQueueRemovalRequest, - ValidRadioSongRequest, ValidRadioTypeaheadRequest, - RadioException) +from .constructors import ( + ValidRadioNextRequest, + ValidRadioReshuffleRequest, + ValidRadioQueueShiftRequest, + ValidRadioQueueRemovalRequest, + ValidRadioSongRequest, + ValidRadioTypeaheadRequest, + RadioException, +) from uuid import uuid4 as uuid from typing import Optional -from fastapi import (FastAPI, BackgroundTasks, Request, - Response, HTTPException) +from fastapi import FastAPI, BackgroundTasks, Request, Response, HTTPException from fastapi.responses import RedirectResponse, JSONResponse + class Radio(FastAPI): """Radio Endpoints""" - def __init__(self, app: FastAPI, - my_util, constants) -> None: + + def __init__(self, app: FastAPI, my_util, constants) -> None: self.app: FastAPI = app self.util = my_util self.constants = constants @@ -33,22 +38,28 @@ class Radio(FastAPI): "radio/queue_shift": self.radio_queue_shift, "radio/reshuffle": self.radio_reshuffle, "radio/queue_remove": self.radio_queue_remove, - "radio/ls._next_": self.radio_get_next, + "radio/ls._next_": self.radio_get_next, } - + for endpoint, handler in self.endpoints.items(): - app.add_api_route(f"/{endpoint}", handler, methods=["POST"], - include_in_schema=True) - - # NOTE: Not in loop because method is GET for this endpoint - app.add_api_route("/radio/album_art", self.album_art_handler, methods=["GET"], - include_in_schema=True) - + app.add_api_route( + f"/{endpoint}", handler, methods=["POST"], include_in_schema=True + ) + + # NOTE: Not in loop because method is GET for this endpoint + app.add_api_route( + "/radio/album_art", + self.album_art_handler, + methods=["GET"], + include_in_schema=True, + ) + asyncio.get_event_loop().run_until_complete(self.radio_util.load_playlist()) - asyncio.get_event_loop().run_until_complete(self.radio_util._ls_skip()) - - async def radio_skip(self, data: ValidRadioNextRequest, - request: Request) -> JSONResponse: + asyncio.get_event_loop().run_until_complete(self.radio_util._ls_skip()) + + async def radio_skip( + self, data: ValidRadioNextRequest, request: Request + ) -> JSONResponse: """ Skip to the next track in the queue, or to uuid specified in skipTo if provided - **key**: API key @@ -58,45 +69,54 @@ class Radio(FastAPI): if not self.util.check_key(path=request.url.path, req_type=4, key=data.key): raise HTTPException(status_code=403, detail="Unauthorized") if data.skipTo: - queue_item = self.radio_util.get_queue_item_by_uuid(data.skipTo) + queue_item = self.radio_util.get_queue_item_by_uuid(data.skipTo) if not queue_item: - return JSONResponse(status_code=500, content={ - 'err': True, - 'errorText': 'No such queue item.', - }) - self.radio_util.active_playlist = self.radio_util.active_playlist[queue_item[0]:] + return JSONResponse( + status_code=500, + content={ + "err": True, + "errorText": "No such queue item.", + }, + ) + self.radio_util.active_playlist = self.radio_util.active_playlist[ + queue_item[0] : + ] if not self.radio_util.active_playlist: await self.radio_util.load_playlist() skip_result: bool = await self.radio_util._ls_skip() status_code = 200 if skip_result else 500 - return JSONResponse(status_code=status_code, content={ - 'success': skip_result, - }) + return JSONResponse( + status_code=status_code, + content={ + "success": skip_result, + }, + ) except Exception as e: traceback.print_exc() - return JSONResponse(status_code=500, content={ - 'err': True, - 'errorText': 'General failure.', - }) - - - async def radio_reshuffle(self, data: ValidRadioReshuffleRequest, - request: Request) -> JSONResponse: + return JSONResponse( + status_code=500, + content={ + "err": True, + "errorText": "General failure.", + }, + ) + + async def radio_reshuffle( + self, data: ValidRadioReshuffleRequest, request: Request + ) -> JSONResponse: """ Reshuffle the play queue - **key**: API key """ if not self.util.check_key(path=request.url.path, req_type=4, key=data.key): raise HTTPException(status_code=403, detail="Unauthorized") - + random.shuffle(self.radio_util.active_playlist) - return JSONResponse(content={ - 'ok': True - }) - - - async def radio_get_queue(self, request: Request, - limit: Optional[int] = 15_000) -> JSONResponse: + return JSONResponse(content={"ok": True}) + + async def radio_get_queue( + self, request: Request, limit: Optional[int] = 15_000 + ) -> JSONResponse: """ Get current play queue, up to limit [default: 15k] - **limit**: Number of queue items to return, default 15k @@ -104,23 +124,24 @@ class Radio(FastAPI): queue: list = self.radio_util.active_playlist[0:limit] queue_out: list[dict] = [] for x, item in enumerate(queue): - queue_out.append({ - 'pos': x, - 'id': item.get('id'), - 'uuid': item.get('uuid'), - 'artist': item.get('artist'), - 'song': item.get('song'), - 'album': item.get('album', 'N/A'), - 'genre': item.get('genre', 'N/A'), - 'artistsong': item.get('artistsong'), - 'duration': item.get('duration'), - }) - return JSONResponse(content={ - 'items': queue_out - }) - - async def radio_queue_shift(self, data: ValidRadioQueueShiftRequest, - request: Request) -> JSONResponse: + queue_out.append( + { + "pos": x, + "id": item.get("id"), + "uuid": item.get("uuid"), + "artist": item.get("artist"), + "song": item.get("song"), + "album": item.get("album", "N/A"), + "genre": item.get("genre", "N/A"), + "artistsong": item.get("artistsong"), + "duration": item.get("duration"), + } + ) + return JSONResponse(content={"items": queue_out}) + + async def radio_queue_shift( + self, data: ValidRadioQueueShiftRequest, request: Request + ) -> JSONResponse: """ Shift position of a UUID within the queue [currently limited to playing next or immediately] @@ -130,24 +151,30 @@ class Radio(FastAPI): """ if not self.util.check_key(path=request.url.path, req_type=4, key=data.key): raise HTTPException(status_code=403, detail="Unauthorized") - + queue_item = self.radio_util.get_queue_item_by_uuid(data.uuid) if not queue_item: - return JSONResponse(status_code=500, content={ - 'err': True, - 'errorText': 'Queue item not found.', - }) + return JSONResponse( + status_code=500, + content={ + "err": True, + "errorText": "Queue item not found.", + }, + ) (x, item) = queue_item - self.radio_util.active_playlist.pop(x) + self.radio_util.active_playlist.pop(x) self.radio_util.active_playlist.insert(0, item) if not data.next: await self.radio_util._ls_skip() - return JSONResponse(content={ - 'ok': True, - }) - - async def radio_queue_remove(self, data: ValidRadioQueueRemovalRequest, - request: Request) -> JSONResponse: + return JSONResponse( + content={ + "ok": True, + } + ) + + async def radio_queue_remove( + self, data: ValidRadioQueueRemovalRequest, request: Request + ) -> JSONResponse: """ Remove an item from the current play queue - **key**: API key @@ -155,19 +182,26 @@ class Radio(FastAPI): """ if not self.util.check_key(path=request.url.path, req_type=4, key=data.key): raise HTTPException(status_code=403, detail="Unauthorized") - + queue_item = self.radio_util.get_queue_item_by_uuid(data.uuid) if not queue_item: - return JSONResponse(status_code=500, content={ - 'err': True, - 'errorText': 'Queue item not found.', - }) + return JSONResponse( + status_code=500, + content={ + "err": True, + "errorText": "Queue item not found.", + }, + ) self.radio_util.active_playlist.pop(queue_item[0]) - return JSONResponse(content={ - 'ok': True, - }) - - async def album_art_handler(self, request: Request, track_id: Optional[int] = None) -> Response: + return JSONResponse( + content={ + "ok": True, + } + ) + + async def album_art_handler( + self, request: Request, track_id: Optional[int] = None + ) -> Response: """ Get album art, optional parameter track_id may be specified. Otherwise, current track album art will be pulled. @@ -175,35 +209,42 @@ class Radio(FastAPI): """ try: if not track_id: - track_id = self.radio_util.now_playing.get('id') + track_id = self.radio_util.now_playing.get("id") logging.debug("Seeking album art with trackId: %s", track_id) - album_art: Optional[bytes] = await self.radio_util.get_album_art(track_id=track_id) + album_art: Optional[bytes] = await self.radio_util.get_album_art( + track_id=track_id + ) if not album_art: - return RedirectResponse(url="https://codey.lol/images/radio_art_default.jpg", - status_code=302) - return Response(content=album_art, - media_type="image/png") + return RedirectResponse( + url="https://codey.lol/images/radio_art_default.jpg", + status_code=302, + ) + return Response(content=album_art, media_type="image/png") except Exception as e: traceback.print_exc() - return RedirectResponse(url="https://codey.lol/images/radio_art_default.jpg", - status_code=302) - + return RedirectResponse( + url="https://codey.lol/images/radio_art_default.jpg", status_code=302 + ) + async def radio_now_playing(self, request: Request) -> JSONResponse: """ Get currently playing track info """ ret_obj: dict = {**self.radio_util.now_playing} try: - ret_obj['elapsed'] = int(time.time()) - ret_obj['start'] + ret_obj["elapsed"] = int(time.time()) - ret_obj["start"] except KeyError: traceback.print_exc() - ret_obj['elapsed'] = 0 - ret_obj.pop('file_path') + ret_obj["elapsed"] = 0 + ret_obj.pop("file_path") return JSONResponse(content=ret_obj) - - - async def radio_get_next(self, data: ValidRadioNextRequest, request: Request, - background_tasks: BackgroundTasks) -> JSONResponse: + + async def radio_get_next( + self, + data: ValidRadioNextRequest, + request: Request, + background_tasks: BackgroundTasks, + ) -> JSONResponse: """ Get next track Track will be removed from the queue in the process. @@ -211,54 +252,65 @@ class Radio(FastAPI): - **skipTo**: Optional UUID to skip to """ if not self.util.check_key(path=request.url.path, req_type=4, key=data.key): - raise HTTPException(status_code=403, detail="Unauthorized") - if not isinstance(self.radio_util.active_playlist, list) or not self.radio_util.active_playlist: + raise HTTPException(status_code=403, detail="Unauthorized") + if ( + not isinstance(self.radio_util.active_playlist, list) + or not self.radio_util.active_playlist + ): await self.radio_util.load_playlist() await self.radio_util._ls_skip() - return JSONResponse(status_code=500, content={ - 'err': True, - 'errorText': 'General failure occurred, prompting playlist reload.', - }) + return JSONResponse( + status_code=500, + content={ + "err": True, + "errorText": "General failure occurred, prompting playlist reload.", + }, + ) next = self.radio_util.active_playlist.pop(0) if not isinstance(next, dict): logging.critical("next is of type: %s, reloading playlist...", type(next)) await self.radio_util.load_playlist() await self.radio_util._ls_skip() - return JSONResponse(status_code=500, content={ - 'err': True, - 'errorText': 'General failure occurred, prompting playlist reload.', - }) - - duration: int = next['duration'] + return JSONResponse( + status_code=500, + content={ + "err": True, + "errorText": "General failure occurred, prompting playlist reload.", + }, + ) + + duration: int = next["duration"] time_started: int = int(time.time()) - time_ends: int = int(time_started + duration) - + time_ends: int = int(time_started + duration) + if len(self.radio_util.active_playlist) > 1: - self.radio_util.active_playlist.append(next) # Push to end of playlist + self.radio_util.active_playlist.append(next) # Push to end of playlist else: await self.radio_util.load_playlist() - + self.radio_util.now_playing = next - next['start'] = time_started - next['end'] = time_ends + next["start"] = time_started + next["end"] = time_ends try: background_tasks.add_task(self.radio_util.webhook_song_change, next) except Exception as e: traceback.print_exc() try: - if not await self.radio_util.get_album_art(file_path=next['file_path']): - album_art = await self.radio_util.get_album_art(file_path=next['file_path']) + if not await self.radio_util.get_album_art(file_path=next["file_path"]): + album_art = await self.radio_util.get_album_art( + file_path=next["file_path"] + ) if album_art: - await self.radio_util.cache_album_art(next['id'], album_art) + await self.radio_util.cache_album_art(next["id"], album_art) else: - logging.debug("Could not read album art for %s", - next['file_path']) + logging.debug("Could not read album art for %s", next["file_path"]) except: traceback.print_exc() return JSONResponse(content=next) - - async def radio_request(self, data: ValidRadioSongRequest, request: Request) -> JSONResponse: + async def radio_request( + self, data: ValidRadioSongRequest, request: Request + ) -> JSONResponse: """ Song request handler - **key**: API key @@ -268,42 +320,52 @@ class Radio(FastAPI): - **alsoSkip**: If True, skips to the track; otherwise, track will be placed next up in queue """ if not self.util.check_key(path=request.url.path, req_type=4, key=data.key): - raise HTTPException(status_code=403, detail="Unauthorized") + raise HTTPException(status_code=403, detail="Unauthorized") artistsong: Optional[str] = data.artistsong artist: Optional[str] = data.artist song: Optional[str] = data.song if artistsong and (artist or song): - return JSONResponse(status_code=500, content={ - 'err': True, - 'errorText': 'Invalid request', - }) + return JSONResponse( + status_code=500, + content={ + "err": True, + "errorText": "Invalid request", + }, + ) if not artistsong and (not artist or not song): - return JSONResponse(status_code=500, content={ - 'err': True, - 'errorText': 'Invalid request', - }) - - search: bool = await self.radio_util.search_playlist(artistsong=artistsong, - artist=artist, - song=song) + return JSONResponse( + status_code=500, + content={ + "err": True, + "errorText": "Invalid request", + }, + ) + + search: bool = await self.radio_util.search_playlist( + artistsong=artistsong, artist=artist, song=song + ) if data.alsoSkip: await self.radio_util._ls_skip() - return JSONResponse(content={ - 'result': search - }) - - async def radio_typeahead(self, data: ValidRadioTypeaheadRequest, - request: Request) -> JSONResponse: + return JSONResponse(content={"result": search}) + + async def radio_typeahead( + self, data: ValidRadioTypeaheadRequest, request: Request + ) -> JSONResponse: """ Radio typeahead handler - **query**: Typeahead query """ if not isinstance(data.query, str): - return JSONResponse(status_code=500, content={ - 'err': True, - 'errorText': 'Invalid request.', - }) - typeahead: Optional[list[str]] = await self.radio_util.trackdb_typeahead(data.query) + return JSONResponse( + status_code=500, + content={ + "err": True, + "errorText": "Invalid request.", + }, + ) + typeahead: Optional[list[str]] = await self.radio_util.trackdb_typeahead( + data.query + ) if not typeahead: return JSONResponse(content=[]) - return JSONResponse(content=typeahead) \ No newline at end of file + return JSONResponse(content=typeahead) diff --git a/endpoints/rand_msg.py b/endpoints/rand_msg.py index 071d707..f0786ce 100644 --- a/endpoints/rand_msg.py +++ b/endpoints/rand_msg.py @@ -6,20 +6,25 @@ from fastapi import FastAPI from fastapi.responses import JSONResponse from .constructors import RandMsgRequest + class RandMsg(FastAPI): """ Random Message Endpoint - """ - def __init__(self, app: FastAPI, - util, constants) -> None: + """ + + def __init__(self, app: FastAPI, util, constants) -> None: self.app: FastAPI = app self.util = util self.constants = constants self.endpoint_name = "randmsg" - app.add_api_route(f"/{self.endpoint_name}", self.randmsg_handler, methods=["POST"]) - - async def randmsg_handler(self, data: Optional[RandMsgRequest] = None) -> JSONResponse: + app.add_api_route( + f"/{self.endpoint_name}", self.randmsg_handler, methods=["POST"] + ) + + async def randmsg_handler( + self, data: Optional[RandMsgRequest] = None + ) -> JSONResponse: """ Get a randomly generated message - **short**: Optional, if True, will limit length of returned random messages to <=126 characters (Discord restriction related) @@ -35,58 +40,61 @@ class RandMsg(FastAPI): match db_rand_selected: case 0: - randmsg_db_path: Union[str, LiteralString] = os.path.join("/usr/local/share", - "sqlite_dbs", "qajoke.db") # For qajoke db - db_query: str = "SELECT id, ('Q: ' || question || '
A: ' \ - || answer) FROM jokes ORDER BY RANDOM() LIMIT 1" # For qajoke db + randmsg_db_path: Union[str, LiteralString] = os.path.join( + "/usr/local/share", "sqlite_dbs", "qajoke.db" + ) # For qajoke db + db_query: str = ( + "SELECT id, ('Q: ' || question || '
A: ' \ + || answer) FROM jokes ORDER BY RANDOM() LIMIT 1" # For qajoke db + ) title_attr = "QA Joke DB" case 1 | 9: - randmsg_db_path = os.path.join("/usr/local/share", - "sqlite_dbs", - "randmsg.db") # For randmsg db + randmsg_db_path = os.path.join( + "/usr/local/share", "sqlite_dbs", "randmsg.db" + ) # For randmsg db db_query = "SELECT id, msg FROM msgs WHERE \ - LENGTH(msg) <= 180 ORDER BY RANDOM() LIMIT 1" # For randmsg db + LENGTH(msg) <= 180 ORDER BY RANDOM() LIMIT 1" # For randmsg db if db_rand_selected == 9: db_query = db_query.replace("<= 180", "<= 126") title_attr = "Random Msg DB" case 2: - randmsg_db_path = os.path.join("/usr/local/share", - "sqlite_dbs", - "trump.db") # For Trump Tweet DB + randmsg_db_path = os.path.join( + "/usr/local/share", "sqlite_dbs", "trump.db" + ) # For Trump Tweet DB db_query = "SELECT id, content FROM tweets \ - ORDER BY RANDOM() LIMIT 1" # For Trump Tweet DB + ORDER BY RANDOM() LIMIT 1" # For Trump Tweet DB title_attr = "Trump Tweet DB" case 3: - randmsg_db_path = os.path.join("/usr/local/share", - "sqlite_dbs", - "philo.db") # For Philo DB + randmsg_db_path = os.path.join( + "/usr/local/share", "sqlite_dbs", "philo.db" + ) # For Philo DB db_query = "SELECT id, (content || '
- ' || speaker) FROM quotes \ ORDER BY RANDOM() LIMIT 1" # For Philo DB title_attr = "Philosophical Quotes DB" case 4: - randmsg_db_path = os.path.join("/usr/local/share", - "sqlite_dbs", - "hate.db") # For Hate DB + randmsg_db_path = os.path.join( + "/usr/local/share", "sqlite_dbs", "hate.db" + ) # For Hate DB db_query = """SELECT id, ("" || comment) FROM hate_speech \ WHERE length(comment) <= 180 ORDER BY RANDOM() LIMIT 1""" title_attr = "Hate Speech DB" case 5: - randmsg_db_path = os.path.join("/usr/local/share", - "sqlite_dbs", - "rjokes.db") # r/jokes DB + randmsg_db_path = os.path.join( + "/usr/local/share", "sqlite_dbs", "rjokes.db" + ) # r/jokes DB db_query = """SELECT id, (title || "
" || body) FROM jokes \ WHERE score >= 10000 ORDER BY RANDOM() LIMIT 1""" title_attr = "r/jokes DB" - + async with sqlite3.connect(database=randmsg_db_path, timeout=1) as _db: async with await _db.execute(db_query) as _cursor: result: sqlite3.Row = await _cursor.fetchone() (result_id, result_msg) = result result_msg = result_msg.strip() - return JSONResponse(content= - { - "id": result_id, - "msg": result_msg, - "title": title_attr, - }) - \ No newline at end of file + return JSONResponse( + content={ + "id": result_id, + "msg": result_msg, + "title": title_attr, + } + ) diff --git a/endpoints/transcriptions.py b/endpoints/transcriptions.py index c272d12..3ad1d72 100644 --- a/endpoints/transcriptions.py +++ b/endpoints/transcriptions.py @@ -5,10 +5,12 @@ from fastapi.responses import JSONResponse from typing import Optional, LiteralString, Union from .constructors import ValidShowEpisodeLineRequest, ValidShowEpisodeListRequest + class Transcriptions(FastAPI): """ Transcription Endpoints - """ + """ + def __init__(self, app: FastAPI, util, constants) -> None: self.app: FastAPI = app self.util = util @@ -17,14 +19,17 @@ class Transcriptions(FastAPI): self.endpoints: dict = { "transcriptions/get_episodes": self.get_episodes_handler, "transcriptions/get_episode_lines": self.get_episode_lines_handler, - #tbd - } + # tbd + } for endpoint, handler in self.endpoints.items(): - app.add_api_route(f"/{endpoint}", handler, methods=["POST"], - include_in_schema=True) - - async def get_episodes_handler(self, data: ValidShowEpisodeListRequest) -> JSONResponse: + app.add_api_route( + f"/{endpoint}", handler, methods=["POST"], include_in_schema=True + ) + + async def get_episodes_handler( + self, data: ValidShowEpisodeListRequest + ) -> JSONResponse: """ Get list of episodes by show id - **s**: Show ID to query @@ -33,56 +38,68 @@ class Transcriptions(FastAPI): db_path: Optional[Union[str, LiteralString]] = None db_query: Optional[str] = None show_title: Optional[str] = None - + if not isinstance(show_id, int): - return JSONResponse(status_code=500, content={ - 'err': True, - 'errorText': 'Invalid request', - }) - + return JSONResponse( + status_code=500, + content={ + "err": True, + "errorText": "Invalid request", + }, + ) + show_id = int(show_id) - - if not(str(show_id).isnumeric()) or show_id not in [0, 1, 2]: - return JSONResponse(status_code=500, content={ - 'err': True, - 'errorText': 'Show not found.', - }) - + + if not (str(show_id).isnumeric()) or show_id not in [0, 1, 2]: + return JSONResponse( + status_code=500, + content={ + "err": True, + "errorText": "Show not found.", + }, + ) + match show_id: case 0: - db_path = os.path.join("/usr/local/share", - "sqlite_dbs", "sp.db") + db_path = os.path.join("/usr/local/share", "sqlite_dbs", "sp.db") db_query = """SELECT DISTINCT(("S" || Season || "E" || Episode || " " || Title)), ID FROM SP_DAT ORDER BY Season, Episode""" show_title = "South Park" case 1: - db_path = os.path.join("/usr/local/share", - "sqlite_dbs", "futur.db") + db_path = os.path.join("/usr/local/share", "sqlite_dbs", "futur.db") db_query = """SELECT DISTINCT(("S" || EP_S || "E" || EP_EP || " " || EP_TITLE)), EP_ID FROM clean_dialog ORDER BY EP_S, EP_EP""" show_title = "Futurama" case 2: - db_path = os.path.join("/usr/local/share", - "sqlite_dbs", "parks.db") + db_path = os.path.join("/usr/local/share", "sqlite_dbs", "parks.db") db_query = """SELECT DISTINCT(("S" || EP_S || "E" || EP_EP || " " || EP_TITLE)), EP_ID FROM clean_dialog ORDER BY EP_S, EP_EP""" show_title = "Parks And Rec" case _: - return JSONResponse(status_code=500, content={ - 'err': True, - 'errorText': 'Unknown error.', - }) - + return JSONResponse( + status_code=500, + content={ + "err": True, + "errorText": "Unknown error.", + }, + ) + async with sqlite3.connect(database=db_path, timeout=1) as _db: async with await _db.execute(db_query) as _cursor: result: list[tuple] = await _cursor.fetchall() - return JSONResponse(content={ - "show_title": show_title, - "episodes": [ - { - 'id': item[1], - 'ep_friendly': item[0], - } for item in result], - }) - - async def get_episode_lines_handler(self, data: ValidShowEpisodeLineRequest) -> JSONResponse: + return JSONResponse( + content={ + "show_title": show_title, + "episodes": [ + { + "id": item[1], + "ep_friendly": item[0], + } + for item in result + ], + } + ) + + async def get_episode_lines_handler( + self, data: ValidShowEpisodeLineRequest + ) -> JSONResponse: """ Get lines for a particular episode - **s**: Show ID to query @@ -90,39 +107,46 @@ class Transcriptions(FastAPI): """ show_id: int = int(data.s) episode_id: int = int(data.e) - + match show_id: case 0: - db_path: Union[str, LiteralString] = os.path.join("/usr/local/share", - "sqlite_dbs", "sp.db") - db_query: str = """SELECT ("S" || Season || "E" || Episode || " " || Title), Character, Line FROM SP_DAT WHERE ID = ?""" + db_path: Union[str, LiteralString] = os.path.join( + "/usr/local/share", "sqlite_dbs", "sp.db" + ) + db_query: str = ( + """SELECT ("S" || Season || "E" || Episode || " " || Title), Character, Line FROM SP_DAT WHERE ID = ?""" + ) case 1: - db_path = os.path.join("/usr/local/share", - "sqlite_dbs", "futur.db") + db_path = os.path.join("/usr/local/share", "sqlite_dbs", "futur.db") db_query = """SELECT ("S" || EP_S || "E" || EP_EP || " " || EP_TITLE || "
Opener: " || EP_OPENER || ""), EP_LINE_SPEAKER, EP_LINE FROM clean_dialog WHERE EP_ID = ? ORDER BY LINE_ID ASC""" case 2: - db_path = os.path.join("/usr/local/share", - "sqlite_dbs", "parks.db") + db_path = os.path.join("/usr/local/share", "sqlite_dbs", "parks.db") db_query = """SELECT ("S" || EP_S || "E" || EP_EP || " " || EP_TITLE), EP_LINE_SPEAKER, EP_LINE FROM clean_dialog WHERE EP_ID = ? ORDER BY id ASC""" - + case _: - return JSONResponse(status_code=500, content={ - 'err': True, - 'errorText': 'Unknown error', - }) - - async with sqlite3.connect(database=db_path, - timeout=1) as _db: + return JSONResponse( + status_code=500, + content={ + "err": True, + "errorText": "Unknown error", + }, + ) + + async with sqlite3.connect(database=db_path, timeout=1) as _db: params: tuple = (episode_id,) async with await _db.execute(db_query, params) as _cursor: result: list[tuple] = await _cursor.fetchall() first_result: tuple = result[0] - return JSONResponse(content={ - 'episode_id': episode_id, - 'ep_friendly': first_result[0].strip(), - 'lines': [ - { - 'speaker': item[1].strip(), - 'line': item[2].strip(), - } for item in result], - }) \ No newline at end of file + return JSONResponse( + content={ + "episode_id": episode_id, + "ep_friendly": first_result[0].strip(), + "lines": [ + { + "speaker": item[1].strip(), + "line": item[2].strip(), + } + for item in result + ], + } + ) diff --git a/endpoints/yt.py b/endpoints/yt.py index c7c3abc..3223e7b 100644 --- a/endpoints/yt.py +++ b/endpoints/yt.py @@ -4,12 +4,13 @@ from fastapi.responses import JSONResponse from typing import Optional, Union from .constructors import ValidYTSearchRequest + class YT(FastAPI): """ YT Endpoints - """ - def __init__(self, app: FastAPI, util, - constants) -> None: + """ + + def __init__(self, app: FastAPI, util, constants) -> None: self.app: FastAPI = app self.util = util self.constants = constants @@ -17,28 +18,34 @@ class YT(FastAPI): self.endpoints: dict = { "yt/search": self.yt_video_search_handler, - } + } for endpoint, handler in self.endpoints.items(): - app.add_api_route(f"/{endpoint}", handler, methods=["POST"], - include_in_schema=True) - + app.add_api_route( + f"/{endpoint}", handler, methods=["POST"], include_in_schema=True + ) + async def yt_video_search_handler(self, data: ValidYTSearchRequest) -> JSONResponse: """ Search for YT Video by Title (closest match returned) - **t**: Title to search """ - + title: str = data.t yts_res: Optional[list[dict]] = await self.ytsearch.search(title) if not yts_res: - return JSONResponse(status_code=404, content={ - 'err': True, - 'errorText': 'No result.', - }) - yt_video_id: Union[str, bool] = yts_res[0].get('id', False) + return JSONResponse( + status_code=404, + content={ + "err": True, + "errorText": "No result.", + }, + ) + yt_video_id: Union[str, bool] = yts_res[0].get("id", False) - return JSONResponse(content={ - 'video_id': yt_video_id, - 'extras': yts_res[0], - }) \ No newline at end of file + return JSONResponse( + content={ + "video_id": yt_video_id, + "extras": yts_res[0], + } + ) diff --git a/gpt/__init__.py b/gpt/__init__.py index 1867d4f..4631fcd 100644 --- a/gpt/__init__.py +++ b/gpt/__init__.py @@ -1,6 +1,7 @@ from typing import Optional from openai import AsyncOpenAI + class GPT: def __init__(self, constants) -> None: self.constants = constants @@ -12,8 +13,9 @@ class GPT: self.default_system_prompt: str = """You are a helpful assistant who will provide only totally accurate tidbits of \ info on the specific songs the user may listen to.""" - async def get_completion(self, prompt: str, - system_prompt: Optional[str] = None) -> Optional[str]: + async def get_completion( + self, prompt: str, system_prompt: Optional[str] = None + ) -> Optional[str]: if not system_prompt: system_prompt = self.default_system_prompt chat_completion = await self.client.chat.completions.create( @@ -25,10 +27,10 @@ class GPT: { "role": "user", "content": prompt, - } + }, ], model="gpt-4o-mini", temperature=0.35, ) response: Optional[str] = chat_completion.choices[0].message.content - return response \ No newline at end of file + return response diff --git a/lyric_search/constructors.py b/lyric_search/constructors.py index 31f8526..73bf626 100644 --- a/lyric_search/constructors.py +++ b/lyric_search/constructors.py @@ -1,6 +1,7 @@ from dataclasses import dataclass from typing import Union + @dataclass class LyricsResult: """ @@ -12,31 +13,37 @@ class LyricsResult: lyrics (Union[str, list]): str if plain lyrics, list for lrc time (float): time taken to retrieve lyrics from source """ + artist: str song: str src: str lyrics: Union[str, list] confidence: int time: float = 0.00 - + """ Generic """ + + class InvalidLyricSearchResponseException(Exception): pass + """ Genius """ -class InvalidGeniusResponseException( - InvalidLyricSearchResponseException): + + +class InvalidGeniusResponseException(InvalidLyricSearchResponseException): pass + """ LRCLib """ -class InvalidLRCLibResponseException( - InvalidLyricSearchResponseException): - pass \ No newline at end of file + +class InvalidLRCLibResponseException(InvalidLyricSearchResponseException): + pass diff --git a/lyric_search/sources/aggregate.py b/lyric_search/sources/aggregate.py index 9e44ec4..68a37f0 100644 --- a/lyric_search/sources/aggregate.py +++ b/lyric_search/sources/aggregate.py @@ -4,23 +4,26 @@ from lyric_search import notifier import sys import logging import traceback -sys.path.insert(1,'..') + +sys.path.insert(1, "..") from . import cache, redis_cache, genius, lrclib + class Aggregate: """ Aggregate all source methods """ - + def __init__(self, exclude_methods=None) -> None: if not exclude_methods: exclude_methods: list = [] self.exclude_methods = exclude_methods self.redis_cache = redis_cache.RedisCache() self.notifier = notifier.DiscordNotifier() - - async def search(self, artist: str, song: str, - plain: Optional[bool] = True) -> Optional[LyricsResult]: + + async def search( + self, artist: str, song: str, plain: Optional[bool] = True + ) -> Optional[LyricsResult]: """ Aggregate Search Args: @@ -41,37 +44,41 @@ class Aggregate: cache_search, lrclib_search, genius_search, - ] + ] if not plain: - sources = [lrclib_search] # Only LRCLib supported for synced lyrics + sources = [lrclib_search] # Only LRCLib supported for synced lyrics search_result: Optional[LyricsResult] = None for source in sources: if source.label.lower() in self.exclude_methods: if not plain: - logging.info("Exclude conditions rejected - source requested to exclude: %s, plain: %s", - source.label, plain) + logging.info( + "Exclude conditions rejected - source requested to exclude: %s, plain: %s", + source.label, + plain, + ) else: if plain: logging.info("Skipping source: %s, excluded.", source.label) continue - - search_result = await source.search(artist=artist, song=song, - plain=plain) + + search_result = await source.search(artist=artist, song=song, plain=plain) if search_result: break logging.info("%s: NOT FOUND!", source.label) if not search_result: - logging.info("%s - %s: all sources exhausted, not found.", - artist, song) - if plain: # do not record LRC fails - try: + logging.info("%s - %s: all sources exhausted, not found.", artist, song) + if plain: # do not record LRC fails + try: await self.redis_cache.increment_found_count("failed") - self.notifier.send("WARNING", - f"Could not find {artist} - {song} via queried sources.") + self.notifier.send( + "WARNING", + f"Could not find {artist} - {song} via queried sources.", + ) except Exception as e: traceback.print_exc() - logging.info("Could not increment redis failed counter: %s", - str(e)) - self.notifier.send(f"ERROR @ {__file__.rsplit("/", maxsplit=1)[-1]}", - f"Could not increment redis failed counter: {str(e)}") - return search_result \ No newline at end of file + logging.info("Could not increment redis failed counter: %s", str(e)) + self.notifier.send( + f"ERROR @ {__file__.rsplit("/", maxsplit=1)[-1]}", + f"Could not increment redis failed counter: {str(e)}", + ) + return search_result diff --git a/lyric_search/sources/cache.py b/lyric_search/sources/cache.py index 8a29696..4b07b98 100644 --- a/lyric_search/sources/cache.py +++ b/lyric_search/sources/cache.py @@ -4,8 +4,9 @@ import regex import logging import sys import traceback -sys.path.insert(1,'..') -sys.path.insert(1,'.') + +sys.path.insert(1, "..") +sys.path.insert(1, ".") from typing import Optional, Union, LiteralString import aiosqlite as sqlite3 from . import redis_cache @@ -15,27 +16,38 @@ from lyric_search.constructors import LyricsResult logger = logging.getLogger() log_level = logging.getLevelName(logger.level) + class Cache: """Cache Search Module""" + def __init__(self) -> None: - self.cache_db: Union[str, LiteralString] = os.path.join("/", "usr", "local", "share", - "sqlite_dbs", "cached_lyrics.db") + self.cache_db: Union[str, LiteralString] = os.path.join( + "/", "usr", "local", "share", "sqlite_dbs", "cached_lyrics.db" + ) self.redis_cache = redis_cache.RedisCache() self.notifier = notifier.DiscordNotifier() - - self.cache_pre_query: str = "pragma journal_mode = WAL; pragma synchronous = normal;\ + + self.cache_pre_query: str = ( + "pragma journal_mode = WAL; pragma synchronous = normal;\ pragma temp_store = memory; pragma mmap_size = 30000000000;" - self.sqlite_exts: list[str] = ['/home/api/api/solibs/spellfix1.cpython-311-x86_64-linux-gnu.so'] + ) + self.sqlite_exts: list[str] = [ + "/home/api/api/solibs/spellfix1.cpython-311-x86_64-linux-gnu.so" + ] self.label: str = "Cache" - def get_matched(self, matched_candidate: tuple, confidence: int, - sqlite_rows: Optional[list[sqlite3.Row]] = None, - redis_results: Optional[list] = None) -> Optional[LyricsResult]: + def get_matched( + self, + matched_candidate: tuple, + confidence: int, + sqlite_rows: Optional[list[sqlite3.Row]] = None, + redis_results: Optional[list] = None, + ) -> Optional[LyricsResult]: """ Get Matched Result Args: matched_candidate (tuple): the correctly matched candidate returned by matcher.best_match - confidence (int): % confidence + confidence (int): % confidence sqlite_rows (Optional[list[sqlite3.Row]]): List of returned rows from SQLite DB, or None if Redis redis_results (Any): List of Redis returned data, or None if SQLite Returns: @@ -47,11 +59,11 @@ class Cache: (key, row) = res if key == matched_id: return LyricsResult( - artist=row['artist'], - song=row['song'], - lyrics=row['lyrics'], + artist=row["artist"], + song=row["song"], + lyrics=row["lyrics"], src=f"{row['src']} (redis cache, id: {key})", - confidence=row['confidence'] + confidence=row["confidence"], ) else: for row in sqlite_rows: @@ -62,9 +74,10 @@ class Cache: song=song, lyrics=lyrics, src=f"{original_src} (cached, id: {_id})", - confidence=confidence) + confidence=confidence, + ) return None - + async def check_existence(self, artistsong: str) -> Optional[bool]: """ Check whether lyrics are already stored for track @@ -73,10 +86,13 @@ class Cache: Returns: bool: Whether track was found in cache """ - logging.debug("Checking whether %s is already stored", - artistsong.replace("\n", " - ")) - check_query: str = 'SELECT id, artist, song FROM lyrics WHERE editdist3((lower(artist) || " " || lower(song)), (? || " " || ?))\ + logging.debug( + "Checking whether %s is already stored", artistsong.replace("\n", " - ") + ) + check_query: str = ( + 'SELECT id, artist, song FROM lyrics WHERE editdist3((lower(artist) || " " || lower(song)), (? || " " || ?))\ <= 410 ORDER BY editdist3((lower(artist) || " " || lower(song)), ?) ASC LIMIT 1' + ) artistsong_split = artistsong.split("\n", maxsplit=1) artist = artistsong_split[0].lower() song = artistsong_split[1].lower() @@ -84,39 +100,45 @@ class Cache: async with sqlite3.connect(self.cache_db, timeout=2) as db_conn: await db_conn.enable_load_extension(True) for ext in self.sqlite_exts: - await db_conn.load_extension(ext) - async with await db_conn.executescript(self.cache_pre_query) as _db_cursor: + await db_conn.load_extension(ext) + async with await db_conn.executescript(self.cache_pre_query) as _db_cursor: async with await db_conn.execute(check_query, params) as db_cursor: result = await db_cursor.fetchone() if result: - logging.debug("%s is already stored.", - artistsong.replace("\n", " - ")) + logging.debug( + "%s is already stored.", artistsong.replace("\n", " - ") + ) return True - logging.debug("%s cleared to be stored.", - artistsong) + logging.debug("%s cleared to be stored.", artistsong) return False - + async def store(self, lyr_result: LyricsResult) -> None: """ Store lyrics (SQLite, then Redis) Args: lyr_result (LyricsResult): the returned lyrics to cache - Returns: None + Returns: None """ - + try: sqlite_insert_id = await self.sqlite_store(lyr_result) if sqlite_insert_id: await self.redis_cache.redis_store(sqlite_insert_id, lyr_result) except Exception as e: traceback.print_exc() - logging.error("ERROR @ %s: %s", - __file__.rsplit("/", maxsplit=1)[-1], f"cache::store >> {str(e)}") - await self.notifier.send(f"ERROR @ {__file__.rsplit("/", maxsplit=1)[-1]}", - f"cache::store >> `{str(e)}`") - - async def sqlite_rowcount(self, where: Optional[str] = None, - params: Optional[tuple] = None) -> int: + logging.error( + "ERROR @ %s: %s", + __file__.rsplit("/", maxsplit=1)[-1], + f"cache::store >> {str(e)}", + ) + await self.notifier.send( + f"ERROR @ {__file__.rsplit("/", maxsplit=1)[-1]}", + f"cache::store >> `{str(e)}`", + ) + + async def sqlite_rowcount( + self, where: Optional[str] = None, params: Optional[tuple] = None + ) -> int: """ Get rowcount for cached_lyrics DB Args: @@ -130,8 +152,8 @@ class Cache: query = f"SELECT count(id) AS rowcount FROM lyrics {where}".strip() async with await db_conn.execute(query, params) as db_cursor: result = await db_cursor.fetchone() - return result['rowcount'] - + return result["rowcount"] + async def sqlite_distinct(self, column: str) -> int: """ Get count of distinct values for a column @@ -145,8 +167,8 @@ class Cache: query = f"SELECT COUNT(DISTINCT {column}) as distinct_items FROM lyrics" async with await db_conn.execute(query) as db_cursor: result = await db_cursor.fetchone() - return result['distinct_items'] - + return result["distinct_items"] + async def sqlite_lyrics_length(self) -> int: """ Get total length of text stored for lyrics @@ -160,9 +182,8 @@ class Cache: query = "SELECT SUM(LENGTH(lyrics)) as lyrics_len FROM lyrics" async with await db_conn.execute(query) as db_cursor: result = await db_cursor.fetchone() - return result['lyrics_len'] - - + return result["lyrics_len"] + async def sqlite_store(self, lyr_result: LyricsResult) -> int: """ Store lyrics to SQLite Cache @@ -172,30 +193,42 @@ class Cache: int: the inserted row id """ - logging.info("Storing %s", - f"{lyr_result.artist} - {lyr_result.song}") - + logging.info("Storing %s", f"{lyr_result.artist} - {lyr_result.song}") + if lyr_result.src.lower() == "cache": - logging.info("Skipping cache storage - returned LyricsResult originated from cache") + logging.info( + "Skipping cache storage - returned LyricsResult originated from cache" + ) return - + artistsong = f"{lyr_result.artist}\n{lyr_result.song}" if await self.check_existence(artistsong): - logging.info("Skipping cache storage - %s is already stored.", - artistsong.replace("\n", " - ")) + logging.info( + "Skipping cache storage - %s is already stored.", + artistsong.replace("\n", " - "), + ) return try: - lyrics = regex.sub(r'(
|\n|\r\n)', ' / ', lyr_result.lyrics.strip()) - lyrics = regex.sub(r'\s{2,}', ' ', lyrics) + lyrics = regex.sub(r"(
|\n|\r\n)", " / ", lyr_result.lyrics.strip()) + lyrics = regex.sub(r"\s{2,}", " ", lyrics) insert_query = "INSERT INTO lyrics (src, date_retrieved, artist, song, artistsong, confidence, lyrics)\ VALUES(?, ?, ?, ?, ?, ?, ?)" - params = (lyr_result.src, time.time(), lyr_result.artist, - lyr_result.song, artistsong, lyr_result.confidence, lyrics) + params = ( + lyr_result.src, + time.time(), + lyr_result.artist, + lyr_result.song, + artistsong, + lyr_result.confidence, + lyrics, + ) async with sqlite3.connect(self.cache_db, timeout=2) as db_conn: - async with await db_conn.executescript(self.cache_pre_query) as _db_cursor: + async with await db_conn.executescript( + self.cache_pre_query + ) as _db_cursor: async with await db_conn.execute(insert_query, params) as _cursor: await db_conn.commit() logging.info("Stored %s to SQLite!", artistsong.replace("\n", " - ")) @@ -203,7 +236,7 @@ class Cache: except: logging.critical("Cache storage error!") traceback.print_exc() - + async def search(self, artist: str, song: str, **kwargs) -> Optional[LyricsResult]: """ Cache Search @@ -214,8 +247,8 @@ class Cache: Optional[LyricsResult]: The result, if found - None otherwise. """ try: - artist: str = artist.strip().lower() - song: str = song.strip().lower() + artist: str = artist.strip().lower() + song: str = song.strip().lower() input_track: str = f"{artist} - {song}" search_query = None search_params: Optional[tuple] = None @@ -225,87 +258,105 @@ class Cache: if artist == "!" and song == "!": random_search = True - search_query: str = 'SELECT id, artist, song, lyrics, src, confidence\ - FROM lyrics ORDER BY RANDOM() LIMIT 1' + search_query: str = ( + "SELECT id, artist, song, lyrics, src, confidence\ + FROM lyrics ORDER BY RANDOM() LIMIT 1" + ) + + logging.info("Searching %s - %s on %s", artist, song, self.label) - logging.info("Searching %s - %s on %s", - artist, song, self.label) - """Check Redis First""" - logging.debug("Checking redis cache for %s...", - f"{artist} - {song}") + logging.debug("Checking redis cache for %s...", f"{artist} - {song}") try: - redis_result = await self.redis_cache.search(artist=artist, - song=song) - + redis_result = await self.redis_cache.search(artist=artist, song=song) + if redis_result: result_tracks: list = [] for returned in redis_result: (key, track) = returned - result_tracks.append((key, f"{track['artist']} - {track['song']}")) - + result_tracks.append( + (key, f"{track['artist']} - {track['song']}") + ) + if not random_search: - best_match: Optional[tuple] = matcher.find_best_match(input_track=input_track, - candidate_tracks=result_tracks) + best_match: Optional[tuple] = matcher.find_best_match( + input_track=input_track, candidate_tracks=result_tracks + ) else: best_match = (result_tracks[0], 100) - - + if best_match: (candidate, confidence) = best_match - matched = self.get_matched(redis_results=redis_result, matched_candidate=candidate, - confidence=confidence) - + matched = self.get_matched( + redis_results=redis_result, + matched_candidate=candidate, + confidence=confidence, + ) + if matched and confidence >= 90: time_end: float = time.time() time_diff: float = time_end - time_start matched.confidence = confidence matched.time = time_diff - logging.info("Found %s on redis cache, skipping SQLite...", - f"{artist} - {song}") - await self.redis_cache.increment_found_count(self.label) + logging.info( + "Found %s on redis cache, skipping SQLite...", + f"{artist} - {song}", + ) + await self.redis_cache.increment_found_count(self.label) return matched except: pass - + """SQLite: Fallback""" - + async with sqlite3.connect(self.cache_db, timeout=2) as db_conn: await db_conn.enable_load_extension(True) for ext in self.sqlite_exts: await db_conn.load_extension(ext) - async with await db_conn.executescript(self.cache_pre_query) as _db_cursor: + async with await db_conn.executescript( + self.cache_pre_query + ) as _db_cursor: if not random_search: - search_query: str = 'SELECT id, artist, song, lyrics, src, confidence FROM lyrics\ + search_query: str = ( + 'SELECT id, artist, song, lyrics, src, confidence FROM lyrics\ WHERE editdist3((lower(artist) || " " || lower(song)), (? || " " || ?))\ <= 410 ORDER BY editdist3((lower(artist) || " " || lower(song)), ?) ASC LIMIT 10' - search_params: tuple = (artist.strip(), song.strip(), - f"{artist.strip()} {song.strip()}") - - async with await _db_cursor.execute(search_query, search_params) as db_cursor: + ) + search_params: tuple = ( + artist.strip(), + song.strip(), + f"{artist.strip()} {song.strip()}", + ) + + async with await _db_cursor.execute( + search_query, search_params + ) as db_cursor: results: list = await db_cursor.fetchall() result_tracks: list = [] for track in results: (_id, _artist, _song, _lyrics, _src, _confidence) = track result_tracks.append((_id, f"{_artist} - {_song}")) if not random_search: - best_match: Optional[tuple] = matcher.find_best_match(input_track=input_track, - candidate_tracks=result_tracks) + best_match: Optional[tuple] = matcher.find_best_match( + input_track=input_track, candidate_tracks=result_tracks + ) else: best_match = (result_tracks[0], 100) if not best_match or confidence < 90: return None (candidate, confidence) = best_match logging.info("Result found on %s", self.label) - matched = self.get_matched(sqlite_rows=results, - matched_candidate=candidate, - confidence=confidence) + matched = self.get_matched( + sqlite_rows=results, + matched_candidate=candidate, + confidence=confidence, + ) time_end: float = time.time() time_diff: float = time_end - time_start matched.time = time_diff await self.redis_cache.increment_found_count(self.label) return matched except: - traceback.print_exc() \ No newline at end of file + traceback.print_exc() diff --git a/lyric_search/sources/common.py b/lyric_search/sources/common.py index 551f213..8f386f3 100644 --- a/lyric_search/sources/common.py +++ b/lyric_search/sources/common.py @@ -1,4 +1,4 @@ SCRAPE_HEADERS: dict[str, str] = { - 'accept': '*/*', - 'User-Agent': 'Mozilla/5.0 (X11; Linux x86_64; rv:130.0) Gecko/20100101 Firefox/130.0', - } \ No newline at end of file + "accept": "*/*", + "User-Agent": "Mozilla/5.0 (X11; Linux x86_64; rv:130.0) Gecko/20100101 Firefox/130.0", +} diff --git a/lyric_search/sources/genius.py b/lyric_search/sources/genius.py index bfed68f..8e59257 100644 --- a/lyric_search/sources/genius.py +++ b/lyric_search/sources/genius.py @@ -1,29 +1,31 @@ import sys -sys.path.insert(1,'..') + +sys.path.insert(1, "..") import traceback import logging import time import re from typing import Optional from aiohttp import ClientTimeout, ClientSession -from bs4 import BeautifulSoup, ResultSet # type: ignore +from bs4 import BeautifulSoup, ResultSet # type: ignore import html as htm from . import private, common, cache, redis_cache from lyric_search import utils -from lyric_search.constructors import ( - LyricsResult, InvalidGeniusResponseException) +from lyric_search.constructors import LyricsResult, InvalidGeniusResponseException logger = logging.getLogger() log_level = logging.getLevelName(logger.level) + class Genius: """ Genius Search Module """ + def __init__(self) -> None: self.label: str = "Genius" self.genius_url: str = private.GENIUS_URL - self.genius_search_url: str = f'{self.genius_url}api/search/song?q=' + self.genius_search_url: str = f"{self.genius_url}api/search/song?q=" self.headers: dict = common.SCRAPE_HEADERS self.timeout = ClientTimeout(connect=3, sock_read=5) self.datautils = utils.DataUtils() @@ -31,8 +33,7 @@ class Genius: self.cache = cache.Cache() self.redis_cache = redis_cache.RedisCache() - async def search(self, artist: str, song: str, - **kwargs) -> Optional[LyricsResult]: + async def search(self, artist: str, song: str, **kwargs) -> Optional[LyricsResult]: """ Genius Search Args: @@ -45,96 +46,125 @@ class Genius: artist: str = artist.strip().lower() song: str = song.strip().lower() time_start: float = time.time() - logging.info("Searching %s - %s on %s", - artist, song, self.label) - search_term: str = f'{artist}%20{song}' - returned_lyrics: str = '' + logging.info("Searching %s - %s on %s", artist, song, self.label) + search_term: str = f"{artist}%20{song}" + returned_lyrics: str = "" async with ClientSession() as client: - async with client.get(f'{self.genius_search_url}{search_term}', - timeout=self.timeout, - headers=self.headers) as request: + async with client.get( + f"{self.genius_search_url}{search_term}", + timeout=self.timeout, + headers=self.headers, + ) as request: request.raise_for_status() text: Optional[str] = await request.text() - + if not text: raise InvalidGeniusResponseException("No search response.") - + if len(text) < 100: - raise InvalidGeniusResponseException("Search response text was invalid (len < 100 chars.)") + raise InvalidGeniusResponseException( + "Search response text was invalid (len < 100 chars.)" + ) search_data = await request.json() - + if not isinstance(search_data, dict): raise InvalidGeniusResponseException("Invalid JSON.") - - if not isinstance(search_data['response'], dict): - raise InvalidGeniusResponseException(f"Invalid JSON: Cannot find response key.\n{search_data}") - - if not isinstance(search_data['response']['sections'], list): - raise InvalidGeniusResponseException(f"Invalid JSON: Cannot find response->sections key.\n{search_data}") - - if not isinstance(search_data['response']['sections'][0]['hits'], list): - raise InvalidGeniusResponseException("Invalid JSON: Cannot find response->sections[0]->hits key.") - - possible_matches: list = search_data['response']['sections'][0]['hits'] + + if not isinstance(search_data["response"], dict): + raise InvalidGeniusResponseException( + f"Invalid JSON: Cannot find response key.\n{search_data}" + ) + + if not isinstance(search_data["response"]["sections"], list): + raise InvalidGeniusResponseException( + f"Invalid JSON: Cannot find response->sections key.\n{search_data}" + ) + + if not isinstance( + search_data["response"]["sections"][0]["hits"], list + ): + raise InvalidGeniusResponseException( + "Invalid JSON: Cannot find response->sections[0]->hits key." + ) + + possible_matches: list = search_data["response"]["sections"][0][ + "hits" + ] to_scrape: list[tuple] = [ ( - returned['result']['path'], - f'{returned['result']['artist_names']} - {returned['result']['title']}', - ) for returned in possible_matches + returned["result"]["path"], + f"{returned['result']['artist_names']} - {returned['result']['title']}", + ) + for returned in possible_matches ] searched: str = f"{artist} - {song}" - best_match: tuple = self.matcher.find_best_match(input_track=searched, - candidate_tracks=to_scrape) + best_match: tuple = self.matcher.find_best_match( + input_track=searched, candidate_tracks=to_scrape + ) ((scrape_stub, track), confidence) = best_match - scrape_url: str = f'{self.genius_url}{scrape_stub[1:]}' - - async with client.get(scrape_url, - timeout=self.timeout, - headers=self.headers) as scrape_request: + scrape_url: str = f"{self.genius_url}{scrape_stub[1:]}" + + async with client.get( + scrape_url, timeout=self.timeout, headers=self.headers + ) as scrape_request: scrape_request.raise_for_status() scrape_text: Optional[str] = await scrape_request.text() - + if not scrape_text: raise InvalidGeniusResponseException("No scrape response.") - + if len(scrape_text) < 100: - raise InvalidGeniusResponseException("Scrape response was invalid (len < 100 chars.)") - - - html = BeautifulSoup(htm.unescape(scrape_text).replace('
', '\n'), "html.parser") - - header_tags_genius: Optional[ResultSet] = html.find_all(class_=re.compile(r'.*Header.*')) + raise InvalidGeniusResponseException( + "Scrape response was invalid (len < 100 chars.)" + ) + + html = BeautifulSoup( + htm.unescape(scrape_text).replace("
", "\n"), + "html.parser", + ) + + header_tags_genius: Optional[ResultSet] = html.find_all( + class_=re.compile(r".*Header.*") + ) if header_tags_genius: for tag in header_tags_genius: tag.extract() - - divs: Optional[ResultSet] = html.find_all("div", {"data-lyrics-container": "true"}) - + + divs: Optional[ResultSet] = html.find_all( + "div", {"data-lyrics-container": "true"} + ) + if not divs: return - + for div in divs: - header_tags: Optional[ResultSet] = div.find_all(['h1', 'h2', 'h3', 'h4', 'h5']) + header_tags: Optional[ResultSet] = div.find_all( + ["h1", "h2", "h3", "h4", "h5"] + ) if header_tags: for tag in header_tags: - tag.extract() + tag.extract() returned_lyrics += div.get_text() - - returned_lyrics: str = self.datautils.scrub_lyrics(returned_lyrics) + + returned_lyrics: str = self.datautils.scrub_lyrics( + returned_lyrics + ) artist: str = track.split(" - ", maxsplit=1)[0] song: str = track.split(" - ", maxsplit=1)[1] logging.info("Result found on %s", self.label) time_end: float = time.time() time_diff: float = time_end - time_start - matched = LyricsResult(artist=artist, - song=song, - src=self.label, - lyrics=returned_lyrics, - confidence=confidence, - time=time_diff) + matched = LyricsResult( + artist=artist, + song=song, + src=self.label, + lyrics=returned_lyrics, + confidence=confidence, + time=time_diff, + ) await self.redis_cache.increment_found_count(self.label) await self.cache.store(matched) return matched except: - traceback.print_exc() \ No newline at end of file + traceback.print_exc() diff --git a/lyric_search/sources/lrclib.py b/lyric_search/sources/lrclib.py index 3df970f..54b2167 100644 --- a/lyric_search/sources/lrclib.py +++ b/lyric_search/sources/lrclib.py @@ -1,6 +1,7 @@ import sys import time -sys.path.insert(1,'..') + +sys.path.insert(1, "..") import traceback import logging from typing import Optional, Union @@ -13,20 +14,23 @@ from lyric_search.constructors import InvalidLRCLibResponseException logger = logging.getLogger() log_level = logging.getLevelName(logger.level) + class LRCLib: """LRCLib Search Module""" + def __init__(self) -> None: self.label: str = "LRCLib" self.lrclib_url: str = "https://lrclib.net/api/search" self.headers: dict = common.SCRAPE_HEADERS self.timeout = ClientTimeout(connect=2, sock_read=4) self.datautils = utils.DataUtils() - self.matcher = utils.TrackMatcher() + self.matcher = utils.TrackMatcher() self.cache = cache.Cache() self.redis_cache = redis_cache.RedisCache() - async def search(self, artist: str, song: str, - plain: Optional[bool] = True) -> Optional[LyricsResult]: + async def search( + self, artist: str, song: str, plain: Optional[bool] = True + ) -> Optional[LyricsResult]: """ LRCLib Search Args: @@ -35,92 +39,124 @@ class LRCLib: Returns: Optional[LyricsResult]: The result, if found - None otherwise. """ - try: + try: artist: str = artist.strip().lower() song: str = song.strip().lower() time_start: float = time.time() lrc_obj: Optional[list[dict]] = None - logging.info("Searching %s - %s on %s", - artist, song, self.label) - - input_track: str = f"{artist} - {song}" - returned_lyrics: str = '' + logging.info("Searching %s - %s on %s", artist, song, self.label) + + input_track: str = f"{artist} - {song}" + returned_lyrics: str = "" async with ClientSession() as client: - async with await client.get(self.lrclib_url, - params = { - 'artist_name': artist, - 'track_name': song, - }, - timeout=self.timeout, - headers=self.headers) as request: + async with await client.get( + self.lrclib_url, + params={ + "artist_name": artist, + "track_name": song, + }, + timeout=self.timeout, + headers=self.headers, + ) as request: request.raise_for_status() - + text: Optional[str] = await request.text() if not text: - raise InvalidLRCLibResponseException("No search response.") + raise InvalidLRCLibResponseException("No search response.") if len(text) < 100: - raise InvalidLRCLibResponseException("Search response text was invalid (len < 100 chars.)") - + raise InvalidLRCLibResponseException( + "Search response text was invalid (len < 100 chars.)" + ) + search_data: Optional[Union[list, dict]] = await request.json() - if not isinstance(search_data, list|dict): + if not isinstance(search_data, list | dict): raise InvalidLRCLibResponseException("No JSON search data.") # logging.info("Search Data:\n%s", search_data) - + if not isinstance(search_data, list): raise InvalidLRCLibResponseException("Invalid JSON.") - + if plain: - possible_matches = [(x, f"{result.get('artistName')} - {result.get('trackName')}") - for x, result in enumerate(search_data)] + possible_matches = [ + ( + x, + f"{result.get('artistName')} - {result.get('trackName')}", + ) + for x, result in enumerate(search_data) + ] else: - logging.info("Limiting possible matches to only those with non-null syncedLyrics") - possible_matches = [(x, f"{result.get('artistName')} - {result.get('trackName')}") - for x, result in enumerate(search_data) if isinstance(result['syncedLyrics'], str)] + logging.info( + "Limiting possible matches to only those with non-null syncedLyrics" + ) + possible_matches = [ + ( + x, + f"{result.get('artistName')} - {result.get('trackName')}", + ) + for x, result in enumerate(search_data) + if isinstance(result["syncedLyrics"], str) + ] - - - best_match = self.matcher.find_best_match(input_track, - possible_matches)[0] + best_match = self.matcher.find_best_match( + input_track, possible_matches + )[0] if not best_match: return best_match_id = best_match[0] - - if not isinstance(search_data[best_match_id]['artistName'], str): - raise InvalidLRCLibResponseException(f"Invalid JSON: Cannot find artistName key.\n{search_data}") - - if not isinstance(search_data[best_match_id]['trackName'], str): - raise InvalidLRCLibResponseException(f"Invalid JSON: Cannot find trackName key.\n{search_data}") - - returned_artist: str = search_data[best_match_id]['artistName'] - returned_song: str = search_data[best_match_id]['trackName'] + + if not isinstance(search_data[best_match_id]["artistName"], str): + raise InvalidLRCLibResponseException( + f"Invalid JSON: Cannot find artistName key.\n{search_data}" + ) + + if not isinstance(search_data[best_match_id]["trackName"], str): + raise InvalidLRCLibResponseException( + f"Invalid JSON: Cannot find trackName key.\n{search_data}" + ) + + returned_artist: str = search_data[best_match_id]["artistName"] + returned_song: str = search_data[best_match_id]["trackName"] if plain: - if not isinstance(search_data[best_match_id]['plainLyrics'], str): - raise InvalidLRCLibResponseException(f"Invalid JSON: Cannot find plainLyrics key.\n{search_data}") - returned_lyrics: str = search_data[best_match_id]['plainLyrics'] + if not isinstance( + search_data[best_match_id]["plainLyrics"], str + ): + raise InvalidLRCLibResponseException( + f"Invalid JSON: Cannot find plainLyrics key.\n{search_data}" + ) + returned_lyrics: str = search_data[best_match_id]["plainLyrics"] returned_lyrics = self.datautils.scrub_lyrics(returned_lyrics) else: - if not isinstance(search_data[best_match_id]['syncedLyrics'], str): - raise InvalidLRCLibResponseException(f"Invalid JSON: Cannot find syncedLyrics key.\n{search_data}") - returned_lyrics: str = search_data[best_match_id]['syncedLyrics'] + if not isinstance( + search_data[best_match_id]["syncedLyrics"], str + ): + raise InvalidLRCLibResponseException( + f"Invalid JSON: Cannot find syncedLyrics key.\n{search_data}" + ) + returned_lyrics: str = search_data[best_match_id][ + "syncedLyrics" + ] lrc_obj = self.datautils.create_lrc_object(returned_lyrics) returned_track: str = f"{returned_artist} - {returned_song}" - (_matched, confidence) = self.matcher.find_best_match(input_track=input_track, - candidate_tracks=[(0, returned_track)]) + (_matched, confidence) = self.matcher.find_best_match( + input_track=input_track, candidate_tracks=[(0, returned_track)] + ) if not confidence: - return # No suitable match found + return # No suitable match found logging.info("Result found on %s", self.label) time_end: float = time.time() time_diff: float = time_end - time_start - matched = LyricsResult(artist=returned_artist, - song=returned_song, - src=self.label, - lyrics=returned_lyrics if plain else lrc_obj, - confidence=confidence, - time=time_diff) + matched = LyricsResult( + artist=returned_artist, + song=returned_song, + src=self.label, + lyrics=returned_lyrics if plain else lrc_obj, + confidence=confidence, + time=time_diff, + ) await self.redis_cache.increment_found_count(self.label) await self.cache.store(matched) return matched except: - traceback.print_exc() \ No newline at end of file + traceback.print_exc() diff --git a/lyric_search/sources/redis_cache.py b/lyric_search/sources/redis_cache.py index 655323e..61717df 100644 --- a/lyric_search/sources/redis_cache.py +++ b/lyric_search/sources/redis_cache.py @@ -7,24 +7,27 @@ import regex from regex import Pattern import asyncio from typing import Union, Optional -sys.path.insert(1,'..') + +sys.path.insert(1, "..") from lyric_search import notifier from lyric_search.constructors import LyricsResult import redis.asyncio as redis -from redis.commands.search.query import Query # type: ignore -from redis.commands.search.indexDefinition import IndexDefinition, IndexType # type: ignore -from redis.commands.search.field import TextField, TagField # type: ignore -from redis.commands.json.path import Path # type: ignore +from redis.commands.search.query import Query # type: ignore +from redis.commands.search.indexDefinition import IndexDefinition, IndexType # type: ignore +from redis.commands.search.field import TextField, TagField # type: ignore +from redis.commands.json.path import Path # type: ignore from . import private logger = logging.getLogger() log_level = logging.getLevelName(logger.level) + class RedisException(Exception): """ Redis Exception """ + class RedisCache: """ Redis Cache Methods @@ -35,34 +38,37 @@ class RedisCache: self.notifier = notifier.DiscordNotifier() self.notify_warnings = False self.regexes: list[Pattern] = [ - regex.compile(r'\-'), - regex.compile(r'[^a-zA-Z0-9\s]'), + regex.compile(r"\-"), + regex.compile(r"[^a-zA-Z0-9\s]"), ] try: asyncio.get_event_loop().create_task(self.create_index()) except Exception as e: - logging.debug("Failed to create redis create_index task: %s", - str(e)) - + logging.debug("Failed to create redis create_index task: %s", str(e)) + async def create_index(self) -> None: """Create Index""" try: schema = ( - TextField("$.search_artist", as_name="artist"), - TextField("$.search_song", as_name="song"), - TextField("$.src", as_name="src"), - TextField("$.lyrics", as_name="lyrics") - ) + TextField("$.search_artist", as_name="artist"), + TextField("$.search_song", as_name="song"), + TextField("$.src", as_name="src"), + TextField("$.lyrics", as_name="lyrics"), + ) result = await self.redis_client.ft().create_index( - schema, definition=IndexDefinition(prefix=["lyrics:"], index_type=IndexType.JSON)) + schema, + definition=IndexDefinition( + prefix=["lyrics:"], index_type=IndexType.JSON + ), + ) if str(result) != "OK": raise RedisException(f"Redis: Failed to create index: {result}") except Exception as e: - logging.debug("Failed to create redis index: %s", - str(e)) - - def sanitize_input(self, artist: str, song: str, - fuzzy: Optional[bool] = False) -> tuple[str, str]: + logging.debug("Failed to create redis index: %s", str(e)) + + def sanitize_input( + self, artist: str, song: str, fuzzy: Optional[bool] = False + ) -> tuple[str, str]: """ Sanitize artist/song input (convert to redis matchable fuzzy query) Args: @@ -77,10 +83,12 @@ class RedisCache: song = self.regexes[0].sub("", song) song = self.regexes[1].sub("", song).strip() if fuzzy: - artist = " ".join([f"(%{artist_word}%)" for artist_word in artist.split(" ")]) + artist = " ".join( + [f"(%{artist_word}%)" for artist_word in artist.split(" ")] + ) song = " ".join([f"(%{song_word}%)" for song_word in song.split(" ")]) return (artist, song) - + async def increment_found_count(self, src: str) -> None: """ Increment the found count for a source @@ -94,13 +102,13 @@ class RedisCache: await self.redis_client.incr(f"returned:{src}") except Exception as e: file: str = __file__.rsplit("/", maxsplit=1)[-1] - await self.notifier.send(f"ERROR @ {file}", str(e)) + await self.notifier.send(f"ERROR @ {file}", str(e)) traceback.print_exc() - + async def get_found_counts(self) -> Optional[dict]: """ Get found counts for all sources (and failed count) - + Returns: dict: In the form {'source': count, 'source2': count, ...} """ @@ -109,18 +117,20 @@ class RedisCache: counts: dict[str, int] = {} for src in sources: src_found_count = await self.redis_client.get(f"returned:{src}") - counts[src] = int(src_found_count) # Redis returns bytes + counts[src] = int(src_found_count) # Redis returns bytes return counts except Exception as e: file: str = __file__.rsplit("/", maxsplit=1)[-1] - await self.notifier.send(f"ERROR @ {file}", str(e)) + await self.notifier.send(f"ERROR @ {file}", str(e)) traceback.print_exc() return None - - - async def search(self, artist: Optional[str] = None, - song: Optional[str] = None, - lyrics: Optional[str] = None) -> Optional[list[tuple]]: + + async def search( + self, + artist: Optional[str] = None, + song: Optional[str] = None, + lyrics: Optional[str] = None, + ) -> Optional[list[tuple]]: """ Search Redis Cache Args: @@ -133,57 +143,72 @@ class RedisCache: try: fuzzy_artist = None - fuzzy_song = None + fuzzy_song = None is_random_search = artist == "!" and song == "!" if lyrics: # to code later raise RedisException("Lyric search not yet implemented") - + if not is_random_search: logging.debug("Redis: Searching normally first") if not artist or not song: - logging.info("redis_cache:: search failed: No artist or song provided.") + logging.info( + "redis_cache:: search failed: No artist or song provided." + ) return None (artist, song) = self.sanitize_input(artist, song) logging.debug("Seeking: %s - %s", artist, song) - search_res: Union[dict, list] = await self.redis_client.ft().search(Query( # type: ignore - f"@artist:{artist} @song:{song}" - )) - search_res_out: list[tuple] = [(result['id'].split(":", - maxsplit=1)[1], dict(json.loads(result['json']))) - for result in search_res.docs] # type: ignore + search_res: Union[dict, list] = await self.redis_client.ft().search( + Query(f"@artist:{artist} @song:{song}") # type: ignore + ) + search_res_out: list[tuple] = [ + ( + result["id"].split(":", maxsplit=1)[1], + dict(json.loads(result["json"])), + ) + for result in search_res.docs + ] # type: ignore if not search_res_out: - logging.debug("Redis: Normal search failed, trying with fuzzy search") - + logging.debug( + "Redis: Normal search failed, trying with fuzzy search" + ) + short_artist = " ".join(artist.split(" ")[0:5]) - short_song = " ".join(song.split(" ")[0:5]) - (fuzzy_artist, fuzzy_song) = self.sanitize_input(artist=short_artist.strip(), - song=short_song.strip(), fuzzy=True) - search_res = await self.redis_client.ft().search(Query( # type: ignore - f"@artist:{fuzzy_artist} @song:{fuzzy_song}" - )) - search_res_out = [(result['id'].split(":", - maxsplit=1)[1], dict(json.loads(result['json']))) - for result in search_res.docs] # type: ignore - + short_song = " ".join(song.split(" ")[0:5]) + (fuzzy_artist, fuzzy_song) = self.sanitize_input( + artist=short_artist.strip(), song=short_song.strip(), fuzzy=True + ) + search_res = await self.redis_client.ft().search( + Query( # type: ignore + f"@artist:{fuzzy_artist} @song:{fuzzy_song}" + ) + ) + search_res_out = [ + ( + result["id"].split(":", maxsplit=1)[1], + dict(json.loads(result["json"])), + ) + for result in search_res.docs + ] # type: ignore + else: random_redis_key: str = await self.redis_client.randomkey() - out_id: str = str(random_redis_key).split(":", - maxsplit=1)[1][:-1] + out_id: str = str(random_redis_key).split(":", maxsplit=1)[1][:-1] search_res = await self.redis_client.json().get(random_redis_key) search_res_out = [(out_id, search_res)] if not search_res_out and self.notify_warnings: - await self.notifier.send("WARNING", f"Redis cache miss for: `{artist} - {song}`") + await self.notifier.send( + "WARNING", f"Redis cache miss for: `{artist} - {song}`" + ) return search_res_out except Exception as e: traceback.print_exc() - # await self.notifier.send(f"ERROR @ {__file__.rsplit("/", maxsplit=1)[-1]}", f"{str(e)}\nSearch was: {artist} - {song}; fuzzy: {fuzzy_artist} - {fuzzy_song}") + # await self.notifier.send(f"ERROR @ {__file__.rsplit("/", maxsplit=1)[-1]}", f"{str(e)}\nSearch was: {artist} - {song}; fuzzy: {fuzzy_artist} - {fuzzy_song}") return None - - async def redis_store(self, sqlite_id: int, - lyr_result: LyricsResult) -> None: + + async def redis_store(self, sqlite_id: int, lyr_result: LyricsResult) -> None: """ Store lyrics to redis cache Args: @@ -193,34 +218,47 @@ class RedisCache: None """ try: - (search_artist, search_song) = self.sanitize_input(lyr_result.artist, - lyr_result.song) + (search_artist, search_song) = self.sanitize_input( + lyr_result.artist, lyr_result.song + ) redis_mapping: dict = { - 'id': sqlite_id, - 'src': lyr_result.src, - 'date_retrieved': time.time(), - 'artist': lyr_result.artist, - 'search_artist': search_artist, - 'search_song': search_song, - 'search_artistsong': f'{search_artist}\n{search_song}', - 'song': lyr_result.song, - 'artistsong': f"{lyr_result.artist}\n{lyr_result.song}", - 'confidence': lyr_result.confidence, - 'lyrics': lyr_result.lyrics, - 'tags': '(none)', - 'liked': 0, - } + "id": sqlite_id, + "src": lyr_result.src, + "date_retrieved": time.time(), + "artist": lyr_result.artist, + "search_artist": search_artist, + "search_song": search_song, + "search_artistsong": f"{search_artist}\n{search_song}", + "song": lyr_result.song, + "artistsong": f"{lyr_result.artist}\n{lyr_result.song}", + "confidence": lyr_result.confidence, + "lyrics": lyr_result.lyrics, + "tags": "(none)", + "liked": 0, + } newkey: str = f"lyrics:000{sqlite_id}" - jsonset: bool = await self.redis_client.json().set(newkey, Path.root_path(), - redis_mapping) + jsonset: bool = await self.redis_client.json().set( + newkey, Path.root_path(), redis_mapping + ) if not jsonset: - raise RedisException(f"Failed to store {lyr_result.artist} - {lyr_result.song} (SQLite id: {sqlite_id}) to redis:\n{jsonset}") - logging.info("Stored %s - %s (related SQLite Row ID: %s) to %s", - lyr_result.artist, lyr_result.song, sqlite_id, newkey) - await self.notifier.send("INFO", - f"Stored `{lyr_result.artist} - {lyr_result.song}` (related SQLite Row ID: `{sqlite_id}`) to redis: `{newkey}`") + raise RedisException( + f"Failed to store {lyr_result.artist} - {lyr_result.song} (SQLite id: {sqlite_id}) to redis:\n{jsonset}" + ) + logging.info( + "Stored %s - %s (related SQLite Row ID: %s) to %s", + lyr_result.artist, + lyr_result.song, + sqlite_id, + newkey, + ) + await self.notifier.send( + "INFO", + f"Stored `{lyr_result.artist} - {lyr_result.song}` (related SQLite Row ID: `{sqlite_id}`) to redis: `{newkey}`", + ) except Exception as e: file: str = __file__.rsplit("/", maxsplit=1)[-1] - await self.notifier.send(f"ERROR @ {file}", - f"Failed to store `{lyr_result.artist} - {lyr_result.song}`\ - (SQLite id: `{sqlite_id}`) to Redis:\n`{str(e)}`") \ No newline at end of file + await self.notifier.send( + f"ERROR @ {file}", + f"Failed to store `{lyr_result.artist} - {lyr_result.song}`\ + (SQLite id: `{sqlite_id}`) to Redis:\n`{str(e)}`", + ) diff --git a/lyric_search/utils.py b/lyric_search/utils.py index b63f06a..e6b2977 100644 --- a/lyric_search/utils.py +++ b/lyric_search/utils.py @@ -4,38 +4,41 @@ import logging import regex from regex import Pattern + class TrackMatcher: """Track Matcher""" + def __init__(self, threshold: float = 0.85): """ Initialize the TrackMatcher with a similarity threshold. - + Args: threshold (float): Minimum similarity score to consider a match valid (between 0 and 1, default 0.85) """ self.threshold = threshold - def find_best_match(self, input_track: str, candidate_tracks: List[tuple[int|str, str]]) -> Optional[tuple]: + def find_best_match( + self, input_track: str, candidate_tracks: List[tuple[int | str, str]] + ) -> Optional[tuple]: """ Find the best matching track from the candidate list. - + Args: input_track (str): Input track in "ARTIST - SONG" format candidate_tracks (List[tuple[int|str, str]]): List of candidate tracks - + Returns: Optional[tuple[int, str, float]]: Tuple of (best matching track, similarity score) or None if no good match found """ - if not input_track or not candidate_tracks: return None # Normalize input track input_track = self._normalize_string(input_track) - + best_match = None best_score: float = 0.0 @@ -43,12 +46,16 @@ class TrackMatcher: normalized_candidate = self._normalize_string(candidate[1]) if normalized_candidate.strip().lower() == input_track.strip().lower(): return (candidate, 100.0) - + # Calculate various similarity scores exact_score = 1.0 if input_track == normalized_candidate else 0.0 - sequence_score = SequenceMatcher(None, input_track, normalized_candidate).ratio() - token_score = self._calculate_token_similarity(input_track, normalized_candidate) - + sequence_score = SequenceMatcher( + None, input_track, normalized_candidate + ).ratio() + token_score = self._calculate_token_similarity( + input_track, normalized_candidate + ) + # Take the maximum of the different scoring methods final_score = max(exact_score, sequence_score, token_score) @@ -59,7 +66,7 @@ class TrackMatcher: # Return the match only if it meets the threshold if best_score < self.threshold: return None - match: tuple = (best_match, round(best_score * 100)) + match: tuple = (best_match, round(best_score * 100)) return match def _normalize_string(self, text: str) -> str: @@ -72,9 +79,9 @@ class TrackMatcher: str: Normalized text """ # Remove special characters and convert to lowercase - text = regex.sub(r'[^\w\s-]', '', text).lower() + text = regex.sub(r"[^\w\s-]", "", text).lower() # Normalize spaces - text = ' '.join(text.split()) + text = " ".join(text.split()) return text def _calculate_token_similarity(self, str1: str, str2: str) -> float: @@ -88,28 +95,32 @@ class TrackMatcher: """ tokens1 = set(str1.split()) tokens2 = set(str2.split()) - + if not tokens1 or not tokens2: return 0.0 intersection = tokens1.intersection(tokens2) union = tokens1.union(tokens2) - + return len(intersection) / len(union) - + + class DataUtils: """ Data Utils """ def __init__(self) -> None: - self.lrc_regex = regex.compile(r'\[([0-9]{2}:[0-9]{2})\.[0-9]{1,3}\](\s(.*)){0,}') - self.scrub_regex_1: Pattern = regex.compile(r'(\[.*?\])(\s){0,}(\:){0,1}') - self.scrub_regex_2: Pattern = regex.compile(r'(\d?)(Embed\b)', - flags=regex.IGNORECASE) - self.scrub_regex_3: Pattern = regex.compile(r'\n{2}') - self.scrub_regex_4: Pattern = regex.compile(r'[0-9]\b$') - + self.lrc_regex = regex.compile( + r"\[([0-9]{2}:[0-9]{2})\.[0-9]{1,3}\](\s(.*)){0,}" + ) + self.scrub_regex_1: Pattern = regex.compile(r"(\[.*?\])(\s){0,}(\:){0,1}") + self.scrub_regex_2: Pattern = regex.compile( + r"(\d?)(Embed\b)", flags=regex.IGNORECASE + ) + self.scrub_regex_3: Pattern = regex.compile(r"\n{2}") + self.scrub_regex_4: Pattern = regex.compile(r"[0-9]\b$") + def scrub_lyrics(self, lyrics: str) -> str: """ Lyric Scrub Regex Chain @@ -118,11 +129,11 @@ class DataUtils: Returns: str: Regex scrubbed lyrics """ - lyrics = self.scrub_regex_1.sub('', lyrics) - lyrics = self.scrub_regex_2.sub('', lyrics) - lyrics = self.scrub_regex_3.sub('\n', lyrics) # Gaps between verses - lyrics = self.scrub_regex_3.sub('', lyrics) - return lyrics + lyrics = self.scrub_regex_1.sub("", lyrics) + lyrics = self.scrub_regex_2.sub("", lyrics) + lyrics = self.scrub_regex_3.sub("\n", lyrics) # Gaps between verses + lyrics = self.scrub_regex_3.sub("", lyrics) + return lyrics def create_lrc_object(self, lrc_str: str) -> list[dict]: """ @@ -142,15 +153,21 @@ class DataUtils: if not reg_helper: continue reg_helper = reg_helper[0] - logging.debug("Reg helper: %s for line: %s; len: %s", - reg_helper, line, len(reg_helper)) + logging.debug( + "Reg helper: %s for line: %s; len: %s", + reg_helper, + line, + len(reg_helper), + ) _timetag = reg_helper[0] if not reg_helper[1].strip(): _words = "♪" else: _words = reg_helper[1].strip() - lrc_out.append({ - "timeTag": _timetag, - "words": _words, - }) - return lrc_out \ No newline at end of file + lrc_out.append( + { + "timeTag": _timetag, + "words": _words, + } + ) + return lrc_out diff --git a/py.typed b/py.typed new file mode 100644 index 0000000..e69de29 diff --git a/util.py b/util.py index 06f9390..868b15a 100644 --- a/util.py +++ b/util.py @@ -10,7 +10,8 @@ class Utilities: """ API Utilities """ - def __init__(self, app: FastAPI, constants): + + def __init__(self, app: FastAPI, constants): self.constants = constants self.blocked_redirect_uri = "https://codey.lol" self.app = app @@ -22,35 +23,31 @@ class Utilities: logging.error("Rejected request: Blocked") return RedirectResponse(url=self.blocked_redirect_uri) - def get_no_endpoint_found(self, path: Optional[str] = None): + def get_no_endpoint_found(self, path: Optional[str] = None): """ Get 404 Response """ logging.error("Rejected request: No such endpoint") raise HTTPException(detail="Unknown endpoint", status_code=404) - def check_key(self, path: str, key: str, - req_type: int = 0) -> bool: + def check_key(self, path: str, key: str, req_type: int = 0) -> bool: """ Accepts path as an argument to allow fine tuning access for each API key, not currently in use. """ if not key or not key.startswith("Bearer "): return False - + _key: str = key.split("Bearer ", maxsplit=1)[1].strip() if not _key in self.constants.API_KEYS: return False - + if req_type == 2: return _key.startswith("PRV-") elif req_type == 4: return _key.startswith("RAD-") - - if path.lower().startswith("/xc/")\ - and not key.startswith("XC-"): - return False + + if path.lower().startswith("/xc/") and not key.startswith("XC-"): + return False return True - - \ No newline at end of file diff --git a/utils/constructors.py b/utils/constructors.py index 8771623..3e05c50 100644 --- a/utils/constructors.py +++ b/utils/constructors.py @@ -2,5 +2,6 @@ LastFM """ + class InvalidLastFMResponseException(Exception): - pass \ No newline at end of file + pass diff --git a/utils/lastfm_wrapper.py b/utils/lastfm_wrapper.py index 7885394..ba0f358 100644 --- a/utils/lastfm_wrapper.py +++ b/utils/lastfm_wrapper.py @@ -6,54 +6,60 @@ from aiohttp import ClientSession, ClientTimeout from constants import Constants from .constructors import InvalidLastFMResponseException + class LastFM: """LastFM Endpoints""" - def __init__(self, - noInit: Optional[bool] = False) -> None: + + def __init__(self, noInit: Optional[bool] = False) -> None: self.creds = Constants().LFM_CREDS self.api_base_url: str = "https://ws.audioscrobbler.com/2.0" - + async def search_artist(self, artist: Optional[str] = None) -> dict: """Search LastFM for an artist""" try: if not artist: return { - 'err': 'No artist specified.', + "err": "No artist specified.", } - + request_params: list[tuple] = [ ("method", "artist.getInfo"), ("artist", artist), - ("api_key", self.creds.get('key')), + ("api_key", self.creds.get("key")), ("autocorrect", "1"), ("format", "json"), ] - - async with ClientSession() as session: - - async with await session.get(self.api_base_url, - params=request_params, - timeout=ClientTimeout(connect=3, sock_read=8)) as request: + + async with ClientSession() as session: + + async with await session.get( + self.api_base_url, + params=request_params, + timeout=ClientTimeout(connect=3, sock_read=8), + ) as request: request.raise_for_status() data: dict = await request.json() - data = data.get('artist', 'N/A') - + data = data.get("artist", "N/A") + ret_obj: dict = { - 'id': data.get('mbid'), - 'touring': data.get('ontour'), - 'name': data.get('name'), - 'bio': data.get('bio', None).get('summary').strip()\ - .split(" Optional[dict]: + + async def get_track_info( + self, artist: Optional[str] = None, track: Optional[str] = None + ) -> Optional[dict]: """ Get Track Info from LastFM Args: @@ -66,12 +72,12 @@ class LastFM: if not artist or not track: logging.info("inv request") return { - 'err': 'Invalid/No artist or track specified', + "err": "Invalid/No artist or track specified", } - + request_params: list[tuple] = [ ("method", "track.getInfo"), - ("api_key", self.creds.get('key')), + ("api_key", self.creds.get("key")), ("autocorrect", "1"), ("artist", artist), ("track", track), @@ -79,29 +85,32 @@ class LastFM: ] async with ClientSession() as session: - async with await session.get(self.api_base_url, - params=request_params, - timeout=ClientTimeout(connect=3, sock_read=8)) as request: + async with await session.get( + self.api_base_url, + params=request_params, + timeout=ClientTimeout(connect=3, sock_read=8), + ) as request: request.raise_for_status() data: dict = await request.json() - data = data.get('track', None) - if not isinstance(data.get('artist'), dict): + data = data.get("track", None) + if not isinstance(data.get("artist"), dict): return None - artist_mbid: int = data.get('artist', None).get('mbid') - album: str = data.get('album', None).get('title') + artist_mbid: int = data.get("artist", None).get("mbid") + album: str = data.get("album", None).get("title") ret_obj: dict = { - 'artist_mbid': artist_mbid, - 'album': album, + "artist_mbid": artist_mbid, + "album": album, } return ret_obj except: traceback.print_exc() return { - 'err': 'General Failure', + "err": "General Failure", } - - async def get_album_tracklist(self, artist: Optional[str] = None, - album: Optional[str] = None) -> dict: + + async def get_album_tracklist( + self, artist: Optional[str] = None, album: Optional[str] = None + ) -> dict: """ Get Album Tracklist Args: @@ -113,24 +122,26 @@ class LastFM: try: if not artist or not album: return { - 'err': 'No artist or album specified', + "err": "No artist or album specified", } - + tracks: dict = await self.get_release(artist=artist, album=album) - tracks = tracks.get('tracks', None) + tracks = tracks.get("tracks", None) ret_obj: dict = { - 'tracks': tracks, - } - + "tracks": tracks, + } + return ret_obj - + except: traceback.print_exc() return { - 'err': 'General Failure', - } - - async def get_artist_albums(self, artist: Optional[str] = None) -> Union[dict, list[dict]]: + "err": "General Failure", + } + + async def get_artist_albums( + self, artist: Optional[str] = None + ) -> Union[dict, list[dict]]: """ Get Artists Albums from LastFM Args: @@ -141,37 +152,39 @@ class LastFM: try: if not artist: return { - 'err': 'No artist specified.', + "err": "No artist specified.", } - + request_params: list[tuple] = [ ("method", "artist.gettopalbums"), ("artist", artist), - ("api_key", self.creds.get('key')), + ("api_key", self.creds.get("key")), ("autocorrect", "1"), ("format", "json"), ] - + async with ClientSession() as session: - async with await session.get(self.api_base_url, - params=request_params, - timeout=ClientTimeout(connect=3, sock_read=8)) as request: + async with await session.get( + self.api_base_url, + params=request_params, + timeout=ClientTimeout(connect=3, sock_read=8), + ) as request: request.raise_for_status() json_data: dict = await request.json() - data: dict = json_data.get('topalbums', None).get('album') + data: dict = json_data.get("topalbums", None).get("album") ret_obj: list = [ - { - 'title': item.get('name') - } for item in data if not(item.get('name').lower() == "(null)")\ - and int(item.get('playcount')) >= 50 + {"title": item.get("name")} + for item in data + if not (item.get("name").lower() == "(null)") + and int(item.get("playcount")) >= 50 ] - return ret_obj + return ret_obj except: traceback.print_exc() return { - 'err': 'Failed', + "err": "Failed", } - + async def get_artist_id(self, artist: Optional[str] = None) -> int: """ Get Artist ID from LastFM @@ -183,16 +196,16 @@ class LastFM: try: if not artist: return -1 - artist_search: dict = await self.search_artist(artist=artist) + artist_search: dict = await self.search_artist(artist=artist) if not artist_search: logging.debug("[get_artist_id] Throwing no result error") return -1 - artist_id: int = int(artist_search[0].get('id', 0)) + artist_id: int = int(artist_search[0].get("id", 0)) return artist_id except: traceback.print_exc() return -1 - + async def get_artist_info_by_id(self, artist_id: Optional[int] = None) -> dict: """ Get Artist info by ID from LastFM @@ -204,44 +217,48 @@ class LastFM: try: if not artist_id or not str(artist_id).isnumeric(): return { - 'err': 'Invalid/no artist_id specified.', + "err": "Invalid/no artist_id specified.", } - + req_url: str = f"{self.api_base_url}/artists/{artist_id}" - + request_params: list[tuple] = [ - ("key", self.creds.get('key')), - ("secret", self.creds.get('secret')), + ("key", self.creds.get("key")), + ("secret", self.creds.get("secret")), ] - + async with ClientSession() as session: - async with await session.get(req_url, - params=request_params, - timeout=ClientTimeout(connect=3, sock_read=8)) as request: + async with await session.get( + req_url, + params=request_params, + timeout=ClientTimeout(connect=3, sock_read=8), + ) as request: request.raise_for_status() data: dict = await request.json() - if not data.get('profile'): - raise InvalidLastFMResponseException("Data did not contain 'profile' key.") - - _id: int = data.get('id', None) - name: str = data.get('name', None) - profile: str = data.get('profile', '') + if not data.get("profile"): + raise InvalidLastFMResponseException( + "Data did not contain 'profile' key." + ) + + _id: int = data.get("id", None) + name: str = data.get("name", None) + profile: str = data.get("profile", "") profile = regex.sub(r"(\[(\/{0,})(u|b|i)])", "", profile) - members: list = data.get('members', None) - + members: list = data.get("members", None) + ret_obj: dict = { - 'id': _id, - 'name': name, - 'profile': profile, - 'members': members, + "id": _id, + "name": name, + "profile": profile, + "members": members, } return ret_obj except: traceback.print_exc() return { - 'err': 'Failed', + "err": "Failed", } - + async def get_artist_info(self, artist: Optional[str] = None) -> dict: """ Get Artist Info from LastFM @@ -253,27 +270,30 @@ class LastFM: try: if not artist: return { - 'err': 'No artist specified.', + "err": "No artist specified.", } artist_id: Optional[int] = await self.get_artist_id(artist=artist) if not artist_id: return { - 'err': 'Failed', + "err": "Failed", } - artist_info: Optional[dict] = await self.get_artist_info_by_id(artist_id=artist_id) + artist_info: Optional[dict] = await self.get_artist_info_by_id( + artist_id=artist_id + ) if not artist_info: return { - 'err': 'Failed', + "err": "Failed", } return artist_info except: traceback.print_exc() return { - 'err': 'Failed', + "err": "Failed", } - - async def get_release(self, artist: Optional[str] = None, - album: Optional[str] = None) -> dict: + + async def get_release( + self, artist: Optional[str] = None, album: Optional[str] = None + ) -> dict: """ Get Release info from LastFM Args: @@ -285,55 +305,61 @@ class LastFM: try: if not artist or not album: return { - 'err': 'Invalid artist/album pair', + "err": "Invalid artist/album pair", } - + request_params: list[tuple] = [ ("method", "album.getinfo"), ("artist", artist), ("album", album), - ("api_key", self.creds.get('key')), + ("api_key", self.creds.get("key")), ("autocorrect", "1"), ("format", "json"), ] async with ClientSession() as session: - async with await session.get(self.api_base_url, - params=request_params, - timeout=ClientTimeout(connect=3, sock_read=8)) as request: + async with await session.get( + self.api_base_url, + params=request_params, + timeout=ClientTimeout(connect=3, sock_read=8), + ) as request: request.raise_for_status() json_data: dict = await request.json() - data: dict = json_data.get('album', None) + data: dict = json_data.get("album", None) ret_obj: dict = { - 'id': data.get('mbid'), - 'artists': data.get('artist'), - 'tags': data.get('tags'), - 'title': data.get('name'), - 'summary': data.get('wiki', None).get('summary').split(" None: self.constants = constants self.gpt = gpt.GPT(self.constants) - self.ls_uri: str = self.constants.LS_URI - self.sqlite_exts: list[str] = ['/home/api/api/solibs/spellfix1.cpython-311-x86_64-linux-gnu.so'] - self.active_playlist_path: Union[str, LiteralString] = os.path\ - .join("/usr/local/share", - "sqlite_dbs", "track_file_map.db") - self.active_playlist_name = "default" # not used + self.ls_uri: str = self.constants.LS_URI + self.sqlite_exts: list[str] = [ + "/home/api/api/solibs/spellfix1.cpython-311-x86_64-linux-gnu.so" + ] + self.active_playlist_path: Union[str, LiteralString] = os.path.join( + "/usr/local/share", "sqlite_dbs", "track_file_map.db" + ) + self.active_playlist_name = "default" # not used self.active_playlist: list[dict] = [] self.now_playing: dict = { - 'artist': 'N/A', - 'song': 'N/A', - 'album': 'N/A', - 'genre': 'N/A', - 'artistsong': 'N/A - N/A', - 'duration': 0, - 'start': 0, - 'end': 0, - 'file_path': None, - 'id': None, - } + "artist": "N/A", + "song": "N/A", + "album": "N/A", + "genre": "N/A", + "artistsong": "N/A - N/A", + "duration": 0, + "start": 0, + "end": 0, + "file_path": None, + "id": None, + } self.webhooks: dict = { - 'gpt': { - 'hook': self.constants.GPT_WEBHOOK, - }, - 'sfm': { - 'hook': self.constants.SFM_WEBHOOK, - } - } - - def duration_conv(self, - s: Union[int, float]) -> str: + "gpt": { + "hook": self.constants.GPT_WEBHOOK, + }, + "sfm": { + "hook": self.constants.SFM_WEBHOOK, + }, + } + + def duration_conv(self, s: Union[int, float]) -> str: """ Convert duration given in seconds to hours, minutes, and seconds (h:m:s) Args: @@ -58,14 +61,12 @@ class RadioUtil: Returns: str """ - return str(datetime.timedelta(seconds=s))\ - .split(".", maxsplit=1)[0] - + return str(datetime.timedelta(seconds=s)).split(".", maxsplit=1)[0] + async def trackdb_typeahead(self, query: str) -> Optional[list[str]]: if not query: return None - async with sqlite3.connect(self.active_playlist_path, - timeout=1) as _db: + async with sqlite3.connect(self.active_playlist_path, timeout=1) as _db: _db.row_factory = sqlite3.Row db_query: str = """SELECT DISTINCT(LOWER(TRIM(artist) || " - " || TRIM(song))),\ (TRIM(artist) || " - " || TRIM(song)) as artistsong FROM tracks WHERE\ @@ -73,14 +74,15 @@ class RadioUtil: db_params: tuple[str] = (f"%{query}%",) async with _db.execute(db_query, db_params) as _cursor: result: Iterable[sqlite3.Row] = await _cursor.fetchall() - out_result = [ - str(r['artistsong']) for r in result - ] + out_result = [str(r["artistsong"]) for r in result] return out_result - - async def search_playlist(self, artistsong: Optional[str] = None, - artist: Optional[str] = None, - song: Optional[str] = None) -> bool: + + async def search_playlist( + self, + artistsong: Optional[str] = None, + artist: Optional[str] = None, + song: Optional[str] = None, + ) -> bool: """ Search for track, add it up next in play queue if found Args: @@ -95,9 +97,11 @@ class RadioUtil: if not artistsong and (not artist or not song): raise RadioException("No query provided") try: - search_query: str = 'SELECT id, artist, song, (artist || " - " || song) AS artistsong, album, genre, file_path, duration FROM tracks\ + search_query: str = ( + 'SELECT id, artist, song, (artist || " - " || song) AS artistsong, album, genre, file_path, duration FROM tracks\ WHERE editdist3((lower(artist) || " " || lower(song)), (? || " " || ?))\ <= 410 ORDER BY editdist3((lower(artist) || " " || lower(song)), ?) ASC LIMIT 1' + ) if artistsong: artistsong_split: list = artistsong.split(" - ", maxsplit=1) (search_artist, search_song) = tuple(artistsong_split) @@ -106,26 +110,31 @@ class RadioUtil: search_song = song if not artistsong: artistsong = f"{search_artist} - {search_song}" - search_params = (search_artist.lower(), search_song.lower(), artistsong.lower(),) - async with sqlite3.connect(self.active_playlist_path, - timeout=2) as db_conn: + search_params = ( + search_artist.lower(), + search_song.lower(), + artistsong.lower(), + ) + async with sqlite3.connect(self.active_playlist_path, timeout=2) as db_conn: await db_conn.enable_load_extension(True) for ext in self.sqlite_exts: await db_conn.load_extension(ext) db_conn.row_factory = sqlite3.Row - async with await db_conn.execute(search_query, search_params) as db_cursor: - result: Optional[sqlite3.Row|bool] = await db_cursor.fetchone() + async with await db_conn.execute( + search_query, search_params + ) as db_cursor: + result: Optional[sqlite3.Row | bool] = await db_cursor.fetchone() if not result or not isinstance(result, sqlite3.Row): return False pushObj: dict = { - 'id': result['id'], - 'uuid': str(uuid().hex), - 'artist': result['artist'].strip(), - 'song': result['song'].strip(), - 'artistsong': result['artistsong'].strip(), - 'genre': result['genre'], - 'file_path': result['file_path'], - 'duration': result['duration'], + "id": result["id"], + "uuid": str(uuid().hex), + "artist": result["artist"].strip(), + "song": result["song"].strip(), + "artistsong": result["artistsong"].strip(), + "genre": result["genre"], + "file_path": result["file_path"], + "duration": result["duration"], } self.active_playlist.insert(0, pushObj) return True @@ -133,7 +142,7 @@ class RadioUtil: logging.critical("search_playlist:: Search error occurred: %s", str(e)) traceback.print_exc() return False - + async def load_playlist(self) -> None: """Load Playlist""" try: @@ -141,11 +150,11 @@ class RadioUtil: self.active_playlist.clear() # db_query = 'SELECT distinct(artist || " - " || song) AS artistdashsong, id, artist, song, album, genre, file_path, duration FROM tracks\ # GROUP BY artistdashsong ORDER BY RANDOM()' - + """ LIMITED GENRES """ - + db_query: str = """SELECT distinct(LOWER(TRIM(artist)) || " - " || LOWER(TRIM(song))), (TRIM(artist) || " - " || TRIM(song)) AS artistdashsong, id, artist, song, album, genre, file_path, duration FROM tracks\ WHERE (genre LIKE "%metalcore%"\ OR genre LIKE "%math rock%"\ @@ -176,44 +185,57 @@ class RadioUtil: OR genre LIKE "%indie pop%"\ OR genre LIKE "%dnb%")\ GROUP BY artistdashsong ORDER BY RANDOM()""" - + """ LIMITED TO ONE/SMALL SUBSET OF GENRES """ - - db_query = 'SELECT distinct(artist || " - " || song) AS artistdashsong, id, artist, song, album, genre, file_path, duration FROM tracks\ - WHERE (artist LIKE "%winds of plague%" OR artist LIKE "%acacia st%" OR artist LIKE "%suicide si%" OR artist LIKE "%in dying%") AND (NOT song LIKE "%(live%") ORDER BY RANDOM()' #ORDER BY artist DESC, album ASC, song ASC' - + + # db_query = 'SELECT distinct(artist || " - " || song) AS artistdashsong, id, artist, song, album, genre, file_path, duration FROM tracks\ + # WHERE (artist LIKE "%winds of plague%" OR artist LIKE "%acacia st%" OR artist LIKE "%suicide si%" OR artist LIKE "%in dying%") AND (NOT song LIKE "%(live%") ORDER BY RANDOM()' #ORDER BY artist DESC, album ASC, song ASC' + """ LIMITED TO ONE/SOME ARTISTS... """ - + # db_query = 'SELECT distinct(artist || " - " || song) AS artistdashsong, id, artist, song, album, genre, file_path, duration FROM tracks\ - # WHERE (artist LIKE "%we butter%" OR artist LIKE "%eisbrecher%" OR artist LIKE "%black ang%" OR artist LIKE "%madison affair%") AND (NOT song LIKE "%%stripped%%" AND NOT song LIKE "%(2022)%" AND NOT song LIKE "%(live%%" AND NOT song LIKE "%%acoustic%%" AND NOT song LIKE "%%instrumental%%" AND NOT song LIKE "%%remix%%" AND NOT song LIKE "%%reimagined%%" AND NOT song LIKE "%%alternative%%" AND NOT song LIKE "%%unzipped%%") GROUP BY artistdashsong ORDER BY RANDOM()'# ORDER BY album ASC, id ASC' - - async with sqlite3.connect(self.active_playlist_path, - timeout=2) as db_conn: + # WHERE (artist LIKE "%rise against%" OR artist LIKE "%i prevail%" OR artist LIKE "%volumes%" OR artist LIKE "%movements%" OR artist LIKE "%woe%" OR artist LIKE "%smittyztop%" OR artist LIKE "%chunk! no,%" OR artist LIKE "%fame on fire%" OR artist LIKE "%our last night%" OR artist LIKE "%animal in me%") AND (NOT song LIKE "%%stripped%%" AND NOT song LIKE "%(2022)%" AND NOT song LIKE "%(live%%" AND NOT song LIKE "%%acoustic%%" AND NOT song LIKE "%%instrumental%%" AND NOT song LIKE "%%remix%%" AND NOT song LIKE "%%reimagined%%" AND NOT song LIKE "%%alternative%%" AND NOT song LIKE "%%unzipped%%") GROUP BY artistdashsong ORDER BY RANDOM()'# ORDER BY album ASC, id ASC' + + # db_query = 'SELECT distinct(artist || " - " || song) AS artistdashsong, id, artist, song, album, genre, file_path, duration FROM tracks\ + # WHERE (artist LIKE "%%" OR artist LIKE "%belmont%" OR artist LIKE "%in dying arms%" OR artist LIKE "%iwrestleda%" OR artist LIKE "%winds of p%") AND (NOT song LIKE "%%stripped%%" AND NOT song LIKE "%(2022)%" AND NOT song LIKE "%(live%%" AND NOT song LIKE "%%acoustic%%" AND NOT song LIKE "%%instrumental%%" AND NOT song LIKE "%%remix%%" AND NOT song LIKE "%%reimagined%%" AND NOT song LIKE "%%alternative%%" AND NOT song LIKE "%%unzipped%%") GROUP BY artistdashsong ORDER BY RANDOM()'# ORDER BY album ASC, id ASC' + + # db_query = 'SELECT distinct(artist || " - " || song) AS artistdashsong, id, artist, song, album, genre, file_path, duration FROM tracks\ + # WHERE (artist LIKE "%akira the don%") AND (NOT song LIKE "%%stripped%%" AND NOT song LIKE "%(2022)%" AND NOT song LIKE "%(live%%" AND NOT song LIKE "%%acoustic%%" AND NOT song LIKE "%%instrumental%%" AND NOT song LIKE "%%remix%%" AND NOT song LIKE "%%reimagined%%" AND NOT song LIKE "%%alternative%%" AND NOT song LIKE "%%unzipped%%") GROUP BY artistdashsong ORDER BY RANDOM()'# ORDER BY album ASC, id ASC' + + async with sqlite3.connect( + f"file:{self.active_playlist_path}?mode=readonly", uri=True, timeout=2 + ) as db_conn: db_conn.row_factory = sqlite3.Row async with await db_conn.execute(db_query) as db_cursor: results: list[sqlite3.Row] = await db_cursor.fetchall() - self.active_playlist = [{ - 'uuid': str(uuid().hex), - 'id': r['id'], - 'artist': double_space.sub(' ', r['artist']).strip(), - 'song': double_space.sub(' ', r['song']).strip(), - 'album': double_space.sub(' ', r['album']).strip(), - 'genre': r['genre'] if r['genre'] else 'Unknown', - 'artistsong': double_space.sub(' ', r['artistdashsong']).strip(), - 'file_path': r['file_path'], - 'duration': r['duration'], - } for r in results] - logging.info("Populated active playlists with %s items", - len(self.active_playlist)) + self.active_playlist = [ + { + "uuid": str(uuid().hex), + "id": r["id"], + "artist": double_space.sub(" ", r["artist"]).strip(), + "song": double_space.sub(" ", r["song"]).strip(), + "album": double_space.sub(" ", r["album"]).strip(), + "genre": r["genre"] if r["genre"] else "Unknown", + "artistsong": double_space.sub( + " ", r["artistdashsong"] + ).strip(), + "file_path": r["file_path"], + "duration": r["duration"], + } + for r in results + ] + logging.info( + "Populated active playlists with %s items", + len(self.active_playlist), + ) except: traceback.print_exc() - - async def cache_album_art(self, track_id: int, - album_art: bytes) -> None: + + async def cache_album_art(self, track_id: int, album_art: bytes) -> None: """ Cache Album Art to SQLite DB Args: @@ -223,16 +245,21 @@ class RadioUtil: None """ try: - async with sqlite3.connect(self.active_playlist_path, - timeout=2) as db_conn: - async with await db_conn.execute("UPDATE tracks SET album_art = ? WHERE id = ?", - (album_art, track_id,)) as db_cursor: + async with sqlite3.connect(self.active_playlist_path, timeout=2) as db_conn: + async with await db_conn.execute( + "UPDATE tracks SET album_art = ? WHERE id = ?", + ( + album_art, + track_id, + ), + ) as db_cursor: await db_conn.commit() except: traceback.print_exc() - - async def get_album_art(self, track_id: Optional[int] = None, - file_path: Optional[str] = None) -> Optional[bytes]: + + async def get_album_art( + self, track_id: Optional[int] = None, file_path: Optional[str] = None + ) -> Optional[bytes]: """ Get Album Art Args: @@ -242,28 +269,27 @@ class RadioUtil: bytes """ try: - async with sqlite3.connect(self.active_playlist_path, - timeout=2) as db_conn: + async with sqlite3.connect(self.active_playlist_path, timeout=2) as db_conn: db_conn.row_factory = sqlite3.Row query: str = "SELECT album_art FROM tracks WHERE id = ?" query_params: tuple = (track_id,) - + if file_path and not track_id: query = "SELECT album_art FROM tracks WHERE file_path = ?" query_params = (file_path,) - - async with await db_conn.execute(query, - query_params) as db_cursor: - result: Optional[Union[sqlite3.Row, bool]] = await db_cursor.fetchone() + + async with await db_conn.execute(query, query_params) as db_cursor: + result: Optional[Union[sqlite3.Row, bool]] = ( + await db_cursor.fetchone() + ) if not result or not isinstance(result, sqlite3.Row): return None - return result['album_art'] + return result["album_art"] except: traceback.print_exc() return None - - def get_queue_item_by_uuid(self, - uuid: str) -> Optional[tuple[int, dict]]: + + def get_queue_item_by_uuid(self, uuid: str) -> Optional[tuple[int, dict]]: """ Get queue item by UUID Args: @@ -272,10 +298,10 @@ class RadioUtil: Optional[tuple[int, dict]] """ for x, item in enumerate(self.active_playlist): - if item.get('uuid') == uuid: + if item.get("uuid") == uuid: return (x, item) return None - + async def _ls_skip(self) -> bool: """ Ask LiquidSoap server to skip to the next track @@ -286,18 +312,18 @@ class RadioUtil: """ try: async with ClientSession() as session: - async with session.get(f"{self.ls_uri}/next", - timeout=ClientTimeout(connect=2, sock_read=2)) as request: - request.raise_for_status() - text: Optional[str] = await request.text() - return text == "OK" + async with session.get( + f"{self.ls_uri}/next", timeout=ClientTimeout(connect=2, sock_read=2) + ) as request: + request.raise_for_status() + text: Optional[str] = await request.text() + return text == "OK" except Exception as e: - logging.debug("Skip failed: %s", str(e)) - - return False # failsafe - - async def get_ai_song_info(self, artist: str, - song: str) -> Optional[str]: + logging.debug("Skip failed: %s", str(e)) + + return False # failsafe + + async def get_ai_song_info(self, artist: str, song: str) -> Optional[str]: """ Get AI Song Info Args: @@ -312,44 +338,50 @@ class RadioUtil: logging.critical("No response received from GPT?") return None return response - + async def webhook_song_change(self, track: dict) -> None: try: """ Handles Song Change Outbounds (Webhooks) - Args: + Args: track (dict) Returns: None """ - - + # First, send track info - friendly_track_start: str = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(track['start'])) - friendly_track_end: str = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(track['end'])) + friendly_track_start: str = time.strftime( + "%Y-%m-%d %H:%M:%S", time.localtime(track["start"]) + ) + friendly_track_end: str = time.strftime( + "%Y-%m-%d %H:%M:%S", time.localtime(track["end"]) + ) hook_data: dict = { - 'username': 'serious.FM', - "embeds": [{ + "username": "serious.FM", + "embeds": [ + { "title": "Now Playing", - "description": f'## {track['song']}\nby\n## {track['artist']}', - "color": 0x30c56f, + "description": f"## {track['song']}\nby\n## {track['artist']}", + "color": 0x30C56F, "thumbnail": { "url": f"https://api.codey.lol/radio/album_art?track_id={track['id']}&{int(time.time())}", }, "fields": [ { "name": "Duration", - "value": self.duration_conv(track['duration']), + "value": self.duration_conv(track["duration"]), "inline": True, }, { "name": "Genre", - "value": track['genre'] if track['genre'] else 'Unknown', + "value": ( + track["genre"] if track["genre"] else "Unknown" + ), "inline": True, }, { "name": "Filetype", - "value": track['file_path'].rsplit(".", maxsplit=1)[1], + "value": track["file_path"].rsplit(".", maxsplit=1)[1], "inline": True, }, { @@ -359,42 +391,56 @@ class RadioUtil: }, { "name": "Album", - "value": track['album'] if track['album'] else "Unknown", + "value": ( + track["album"] if track["album"] else "Unknown" + ), }, - ] - }] - } - - sfm_hook: str = self.webhooks['sfm'].get('hook') + ], + } + ], + } + + sfm_hook: str = self.webhooks["sfm"].get("hook") async with ClientSession() as session: - async with await session.post(sfm_hook, json=hook_data, - timeout=ClientTimeout(connect=5, sock_read=5), headers={ - 'content-type': 'application/json; charset=utf-8',}) as request: - request.raise_for_status() - - # Next, AI feedback - - ai_response: Optional[str] = await self.get_ai_song_info(track['artist'], - track['song']) + async with await session.post( + sfm_hook, + json=hook_data, + timeout=ClientTimeout(connect=5, sock_read=5), + headers={ + "content-type": "application/json; charset=utf-8", + }, + ) as request: + request.raise_for_status() + + # Next, AI feedback + + ai_response: Optional[str] = await self.get_ai_song_info( + track["artist"], track["song"] + ) if not ai_response: return - + hook_data = { - 'username': 'GPT', - "embeds": [{ + "username": "GPT", + "embeds": [ + { "title": "AI Feedback", - "color": 0x35d0ff, + "color": 0x35D0FF, "description": ai_response.strip(), - }] - } - - ai_hook: str = self.webhooks['gpt'].get('hook') + } + ], + } + + ai_hook: str = self.webhooks["gpt"].get("hook") async with ClientSession() as session: - async with await session.post(ai_hook, json=hook_data, - timeout=ClientTimeout(connect=5, sock_read=5), headers={ - 'content-type': 'application/json; charset=utf-8',}) as request: - request.raise_for_status() + async with await session.post( + ai_hook, + json=hook_data, + timeout=ClientTimeout(connect=5, sock_read=5), + headers={ + "content-type": "application/json; charset=utf-8", + }, + ) as request: + request.raise_for_status() except Exception as e: traceback.print_exc() - - \ No newline at end of file