significant refactor/cleanup
This commit is contained in:
parent
2c368aaf1a
commit
88d870ce8f
@ -64,6 +64,9 @@ class ValidTopKarmaRequest(BaseModel):
|
|||||||
LastFM
|
LastFM
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
class LastFMException(Exception):
|
||||||
|
pass
|
||||||
|
|
||||||
class ValidArtistSearchRequest(BaseModel):
|
class ValidArtistSearchRequest(BaseModel):
|
||||||
"""
|
"""
|
||||||
- **a**: artist name
|
- **a**: artist name
|
||||||
@ -81,17 +84,17 @@ class ValidArtistSearchRequest(BaseModel):
|
|||||||
class ValidAlbumDetailRequest(BaseModel):
|
class ValidAlbumDetailRequest(BaseModel):
|
||||||
"""
|
"""
|
||||||
- **a**: artist name
|
- **a**: artist name
|
||||||
- **a2**: album/release name (as sourced from here/LastFM)
|
- **release**: album/release name (as sourced from here/LastFM)
|
||||||
"""
|
"""
|
||||||
|
|
||||||
a: str
|
a: str
|
||||||
a2: str
|
release: str
|
||||||
|
|
||||||
class Config: # pylint: disable=missing-class-docstring
|
class Config: # pylint: disable=missing-class-docstring
|
||||||
schema_extra = {
|
schema_extra = {
|
||||||
"example": {
|
"example": {
|
||||||
"a": "eminem",
|
"a": "eminem",
|
||||||
"a2": "houdini"
|
"release": "houdini"
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -7,6 +7,7 @@ import time
|
|||||||
import datetime
|
import datetime
|
||||||
import traceback
|
import traceback
|
||||||
import aiosqlite as sqlite3
|
import aiosqlite as sqlite3
|
||||||
|
from typing import LiteralString, Optional
|
||||||
from fastapi import FastAPI, Request, HTTPException
|
from fastapi import FastAPI, Request, HTTPException
|
||||||
from .constructors import ValidTopKarmaRequest, ValidKarmaRetrievalRequest,\
|
from .constructors import ValidTopKarmaRequest, ValidKarmaRetrievalRequest,\
|
||||||
ValidKarmaUpdateRequest
|
ValidKarmaUpdateRequest
|
||||||
@ -14,11 +15,16 @@ from .constructors import ValidTopKarmaRequest, ValidKarmaRetrievalRequest,\
|
|||||||
class KarmaDB:
|
class KarmaDB:
|
||||||
"""Karma DB Util"""
|
"""Karma DB Util"""
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
self.db_path = os.path.join("/", "usr", "local", "share",
|
self.db_path: LiteralString = os.path.join("/", "usr", "local", "share",
|
||||||
"sqlite_dbs", "karma.db")
|
"sqlite_dbs", "karma.db")
|
||||||
|
|
||||||
async def get_karma(self, keyword: str) -> int | dict:
|
async def get_karma(self, keyword: str) -> int | dict:
|
||||||
"""Get Karma Value for Keyword"""
|
"""Get Karma Value for Keyword
|
||||||
|
Args:
|
||||||
|
keyword (str): The keyword to search
|
||||||
|
Returns:
|
||||||
|
int|dict
|
||||||
|
"""
|
||||||
async with sqlite3.connect(self.db_path, timeout=2) as db_conn:
|
async with sqlite3.connect(self.db_path, timeout=2) as db_conn:
|
||||||
async with await db_conn.execute("SELECT score FROM karma WHERE keyword LIKE ? LIMIT 1", (keyword,)) as db_cursor:
|
async with await db_conn.execute("SELECT score FROM karma WHERE keyword LIKE ? LIMIT 1", (keyword,)) as db_cursor:
|
||||||
try:
|
try:
|
||||||
@ -30,8 +36,13 @@ class KarmaDB:
|
|||||||
'errorText': f'No records for {keyword}',
|
'errorText': f'No records for {keyword}',
|
||||||
}
|
}
|
||||||
|
|
||||||
async def get_top(self, n: int = 10):
|
async def get_top(self, n: Optional[int] = 10) -> list[tuple]:
|
||||||
"""Get Top n=10 Karma Entries"""
|
"""Get Top n=10 Karma Entries
|
||||||
|
Args:
|
||||||
|
n (Optional[int]) = 10: The number of top results to return
|
||||||
|
Returns:
|
||||||
|
list[tuple]
|
||||||
|
"""
|
||||||
try:
|
try:
|
||||||
async with sqlite3.connect(self.db_path, timeout=2) as db_conn:
|
async with sqlite3.connect(self.db_path, timeout=2) as db_conn:
|
||||||
async with await db_conn.execute("SELECT keyword, score FROM karma ORDER BY score DESC LIMIT ?", (n,)) as db_cursor:
|
async with await db_conn.execute("SELECT keyword, score FROM karma ORDER BY score DESC LIMIT ?", (n,)) as db_cursor:
|
||||||
@ -40,25 +51,32 @@ class KarmaDB:
|
|||||||
traceback.print_exc()
|
traceback.print_exc()
|
||||||
return
|
return
|
||||||
|
|
||||||
async def update_karma(self, granter: str, keyword: str, flag: int):
|
async def update_karma(self, granter: str, keyword: str, flag: int) -> Optional[bool]:
|
||||||
"""Update Karma for Keyword"""
|
"""Update Karma for Keyword
|
||||||
|
Args:
|
||||||
|
granter (str): The user who granted (increased/decreased) the karma
|
||||||
|
keyword (str): The keyword to update
|
||||||
|
flag (int): 0 to increase karma, 1 to decrease karma
|
||||||
|
Returns:
|
||||||
|
Optional[bool]
|
||||||
|
"""
|
||||||
|
|
||||||
if not flag in [0, 1]:
|
if not flag in [0, 1]:
|
||||||
return
|
return
|
||||||
|
|
||||||
modifier = "score + 1" if not flag else "score - 1"
|
modifier: str = "score + 1" if not flag else "score - 1"
|
||||||
query = f"UPDATE karma SET score = {modifier}, last_change = ? WHERE keyword LIKE ?"
|
query: str = f"UPDATE karma SET score = {modifier}, last_change = ? WHERE keyword LIKE ?"
|
||||||
new_keyword_query = "INSERT INTO karma(keyword, score, last_change) VALUES(?, ?, ?)"
|
new_keyword_query: str = "INSERT INTO karma(keyword, score, last_change) VALUES(?, ?, ?)"
|
||||||
friendly_flag = "++" if not flag else "--"
|
friendly_flag: str = "++" if not flag else "--"
|
||||||
audit_message = f"{granter} adjusted karma for {keyword} @ {datetime.datetime.now().isoformat()}: {friendly_flag}"
|
audit_message: str = f"{granter} adjusted karma for {keyword} @ {datetime.datetime.now().isoformat()}: {friendly_flag}"
|
||||||
audit_query = "INSERT INTO karma_audit(impacted_keyword, comment) VALUES(?, ?)"
|
audit_query: str = "INSERT INTO karma_audit(impacted_keyword, comment) VALUES(?, ?)"
|
||||||
now = int(time.time())
|
now: int = int(time.time())
|
||||||
|
|
||||||
logging.debug("Audit message: %s{audit_message}\nKeyword: %s{keyword}")
|
logging.debug("Audit message: %s{audit_message}\nKeyword: %s{keyword}")
|
||||||
|
|
||||||
async with sqlite3.connect(self.db_path, timeout=2) as db_conn:
|
async with sqlite3.connect(self.db_path, timeout=2) as db_conn:
|
||||||
async with await db_conn.execute(audit_query, (keyword, audit_message,)) as db_cursor:
|
async with await db_conn.execute(audit_query, (keyword, audit_message,)) as db_cursor:
|
||||||
await db_conn.commit()
|
await db_conn.commit()
|
||||||
await db_cursor.close()
|
|
||||||
async with await db_conn.execute(query, (now, keyword,)) as db_cursor:
|
async with await db_conn.execute(query, (now, keyword,)) as db_cursor:
|
||||||
if db_cursor.rowcount:
|
if db_cursor.rowcount:
|
||||||
await db_conn.commit()
|
await db_conn.commit()
|
||||||
@ -72,11 +90,6 @@ class KarmaDB:
|
|||||||
return True
|
return True
|
||||||
else:
|
else:
|
||||||
return False
|
return False
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
class Karma(FastAPI):
|
class Karma(FastAPI):
|
||||||
"""Karma Endpoints"""
|
"""Karma Endpoints"""
|
||||||
def __init__(self, app: FastAPI, util, constants, glob_state): # pylint: disable=super-init-not-called
|
def __init__(self, app: FastAPI, util, constants, glob_state): # pylint: disable=super-init-not-called
|
||||||
@ -86,7 +99,7 @@ class Karma(FastAPI):
|
|||||||
self.glob_state = glob_state
|
self.glob_state = glob_state
|
||||||
self.db = KarmaDB()
|
self.db = KarmaDB()
|
||||||
|
|
||||||
self.endpoints = {
|
self.endpoints: dict = {
|
||||||
"karma/get": self.get_karma_handler,
|
"karma/get": self.get_karma_handler,
|
||||||
"karma/modify": self.modify_karma_handler,
|
"karma/modify": self.modify_karma_handler,
|
||||||
"karma/top": self.top_karma_handler,
|
"karma/top": self.top_karma_handler,
|
||||||
@ -97,23 +110,19 @@ class Karma(FastAPI):
|
|||||||
include_in_schema=False)
|
include_in_schema=False)
|
||||||
|
|
||||||
|
|
||||||
async def top_karma_handler(self, request: Request, data: ValidTopKarmaRequest | None = None):
|
async def top_karma_handler(self, request: Request, data: ValidTopKarmaRequest | None = None) -> list[tuple]|dict:
|
||||||
"""
|
"""Get top keywords for karma"""
|
||||||
/karma/top
|
|
||||||
Get top keywords for karma
|
|
||||||
(Requires key)
|
|
||||||
"""
|
|
||||||
|
|
||||||
if not self.util.check_key(request.url.path, request.headers.get('X-Authd-With')):
|
if not self.util.check_key(request.url.path, request.headers.get('X-Authd-With')):
|
||||||
raise HTTPException(status_code=403, detail="Unauthorized")
|
raise HTTPException(status_code=403, detail="Unauthorized")
|
||||||
|
|
||||||
n = 10
|
n: int = 10
|
||||||
if data:
|
if data:
|
||||||
n = data.n
|
n: int = int(data.n)
|
||||||
|
|
||||||
|
|
||||||
try:
|
try:
|
||||||
top10 = await self.db.get_top(n=n)
|
top10: list[tuple] = await self.db.get_top(n=n)
|
||||||
return top10
|
return top10
|
||||||
except:
|
except:
|
||||||
traceback.print_exc()
|
traceback.print_exc()
|
||||||
@ -123,18 +132,14 @@ class Karma(FastAPI):
|
|||||||
}
|
}
|
||||||
|
|
||||||
async def get_karma_handler(self, data: ValidKarmaRetrievalRequest, request: Request):
|
async def get_karma_handler(self, data: ValidKarmaRetrievalRequest, request: Request):
|
||||||
"""
|
"""Get current karma value"""
|
||||||
/karma/get
|
|
||||||
Get current karma value
|
|
||||||
(Requires key)
|
|
||||||
"""
|
|
||||||
|
|
||||||
if not self.util.check_key(request.url.path, request.headers.get('X-Authd-With')):
|
if not self.util.check_key(request.url.path, request.headers.get('X-Authd-With')):
|
||||||
raise HTTPException(status_code=403, detail="Unauthorized")
|
raise HTTPException(status_code=403, detail="Unauthorized")
|
||||||
|
|
||||||
keyword = data.keyword
|
keyword: str = data.keyword
|
||||||
try:
|
try:
|
||||||
count = await self.db.get_karma(keyword)
|
count: int|dict = await self.db.get_karma(keyword)
|
||||||
return {
|
return {
|
||||||
'keyword': keyword,
|
'keyword': keyword,
|
||||||
'count': count,
|
'count': count,
|
||||||
@ -146,12 +151,8 @@ class Karma(FastAPI):
|
|||||||
'errorText': "Exception occurred."
|
'errorText': "Exception occurred."
|
||||||
}
|
}
|
||||||
|
|
||||||
async def modify_karma_handler(self, data: ValidKarmaUpdateRequest, request: Request):
|
async def modify_karma_handler(self, data: ValidKarmaUpdateRequest, request: Request) -> dict:
|
||||||
"""
|
"""Update karma count"""
|
||||||
/karma/update
|
|
||||||
Update karma count
|
|
||||||
(Requires key)
|
|
||||||
"""
|
|
||||||
|
|
||||||
if not self.util.check_key(request.url.path, request.headers.get('X-Authd-With'), 2):
|
if not self.util.check_key(request.url.path, request.headers.get('X-Authd-With'), 2):
|
||||||
raise HTTPException(status_code=403, detail="Unauthorized")
|
raise HTTPException(status_code=403, detail="Unauthorized")
|
||||||
|
@ -3,9 +3,10 @@
|
|||||||
|
|
||||||
import importlib
|
import importlib
|
||||||
import traceback
|
import traceback
|
||||||
|
from typing import Optional
|
||||||
from fastapi import FastAPI
|
from fastapi import FastAPI
|
||||||
from .constructors import ValidArtistSearchRequest, ValidAlbumDetailRequest,\
|
from .constructors import ValidArtistSearchRequest, ValidAlbumDetailRequest,\
|
||||||
ValidTrackInfoRequest
|
ValidTrackInfoRequest, LastFMException
|
||||||
|
|
||||||
class LastFM(FastAPI):
|
class LastFM(FastAPI):
|
||||||
"""Last.FM Endpoints"""
|
"""Last.FM Endpoints"""
|
||||||
@ -34,7 +35,7 @@ class LastFM(FastAPI):
|
|||||||
Get artist info
|
Get artist info
|
||||||
- **a**: Artist to search
|
- **a**: Artist to search
|
||||||
"""
|
"""
|
||||||
artist = data.a.strip()
|
artist: Optional[str] = data.a.strip()
|
||||||
if not artist:
|
if not artist:
|
||||||
return {
|
return {
|
||||||
'err': True,
|
'err': True,
|
||||||
@ -53,24 +54,24 @@ class LastFM(FastAPI):
|
|||||||
'result': artist_result
|
'result': artist_result
|
||||||
}
|
}
|
||||||
|
|
||||||
async def artist_album_handler(self, data: ValidArtistSearchRequest):
|
async def artist_album_handler(self, data: ValidArtistSearchRequest) -> dict:
|
||||||
"""
|
"""
|
||||||
Get artist's albums/releases
|
Get artist's albums/releases
|
||||||
- **a**: Artist to search
|
- **a**: Artist to search
|
||||||
"""
|
"""
|
||||||
artist = data.a.strip()
|
artist: str = data.a.strip()
|
||||||
if not artist:
|
if not artist:
|
||||||
return {
|
return {
|
||||||
'err': True,
|
'err': True,
|
||||||
'errorText': 'No artist specified'
|
'errorText': 'No artist specified'
|
||||||
}
|
}
|
||||||
|
|
||||||
album_result = await self.lastfm.get_artist_albums(artist=artist)
|
album_result: dict|list[dict] = await self.lastfm.get_artist_albums(artist=artist)
|
||||||
album_result_out = []
|
album_result_out: list = []
|
||||||
seen_release_titles = []
|
seen_release_titles: list = []
|
||||||
|
|
||||||
for release in album_result:
|
for release in album_result:
|
||||||
release_title = release.get('title')
|
release_title: str = release.get('title')
|
||||||
if release_title.lower() in seen_release_titles:
|
if release_title.lower() in seen_release_titles:
|
||||||
continue
|
continue
|
||||||
seen_release_titles.append(release_title.lower())
|
seen_release_titles.append(release_title.lower())
|
||||||
@ -81,14 +82,14 @@ class LastFM(FastAPI):
|
|||||||
'result': album_result_out
|
'result': album_result_out
|
||||||
}
|
}
|
||||||
|
|
||||||
async def release_detail_handler(self, data: ValidAlbumDetailRequest):
|
async def release_detail_handler(self, data: ValidAlbumDetailRequest) -> dict:
|
||||||
"""
|
"""
|
||||||
Get details of a particular release by an artist
|
Get details of a particular release by an artist
|
||||||
- **a**: Artist to search
|
- **a**: Artist to search
|
||||||
- **a2**: Release title to search (subject to change)
|
- **release**: Release title to search
|
||||||
"""
|
"""
|
||||||
artist = data.a.strip()
|
artist: str = data.a.strip()
|
||||||
release = data.a2.strip()
|
release: str = data.release.strip()
|
||||||
|
|
||||||
if not artist or not release:
|
if not artist or not release:
|
||||||
return {
|
return {
|
||||||
@ -110,14 +111,14 @@ class LastFM(FastAPI):
|
|||||||
'result': ret_obj
|
'result': ret_obj
|
||||||
}
|
}
|
||||||
|
|
||||||
async def release_tracklist_handler(self, data: ValidAlbumDetailRequest):
|
async def release_tracklist_handler(self, data: ValidAlbumDetailRequest) -> dict:
|
||||||
"""
|
"""
|
||||||
Get track list for a particular release by an artist
|
Get track list for a particular release by an artist
|
||||||
- **a**: Artist to search
|
- **a**: Artist to search
|
||||||
- **a2**: Release title to search (subject to change)
|
- **release**: Release title to search
|
||||||
"""
|
"""
|
||||||
artist = data.a.strip()
|
artist: str = data.a.strip()
|
||||||
release = data.a2.strip()
|
release: str = data.release.strip()
|
||||||
|
|
||||||
if not artist or not release:
|
if not artist or not release:
|
||||||
return {
|
return {
|
||||||
@ -125,7 +126,7 @@ class LastFM(FastAPI):
|
|||||||
'errorText': 'Invalid request'
|
'errorText': 'Invalid request'
|
||||||
}
|
}
|
||||||
|
|
||||||
tracklist_result = await self.lastfm.get_album_tracklist(artist=artist, album=release)
|
tracklist_result: dict = await self.lastfm.get_album_tracklist(artist=artist, album=release)
|
||||||
return {
|
return {
|
||||||
'success': True,
|
'success': True,
|
||||||
'id': tracklist_result.get('id'),
|
'id': tracklist_result.get('id'),
|
||||||
@ -135,15 +136,15 @@ class LastFM(FastAPI):
|
|||||||
'tracks': tracklist_result.get('tracks')
|
'tracks': tracklist_result.get('tracks')
|
||||||
}
|
}
|
||||||
|
|
||||||
async def track_info_handler(self, data: ValidTrackInfoRequest):
|
async def track_info_handler(self, data: ValidTrackInfoRequest) -> dict:
|
||||||
"""
|
"""
|
||||||
Get track info from Last.FM given an artist/track
|
Get track info from Last.FM given an artist/track
|
||||||
- **a**: Artist to search
|
- **a**: Artist to search
|
||||||
- **t**: Track title to search
|
- **t**: Track title to search
|
||||||
"""
|
"""
|
||||||
try:
|
try:
|
||||||
artist = data.a
|
artist: str = data.a
|
||||||
track = data.t
|
track: str = data.t
|
||||||
|
|
||||||
if not artist or not track:
|
if not artist or not track:
|
||||||
return {
|
return {
|
||||||
@ -151,8 +152,10 @@ class LastFM(FastAPI):
|
|||||||
'errorText': 'Invalid request'
|
'errorText': 'Invalid request'
|
||||||
}
|
}
|
||||||
|
|
||||||
track_info_result = await self.lastfm.get_track_info(artist=artist, track=track)
|
track_info_result: dict = await self.lastfm.get_track_info(artist=artist, track=track)
|
||||||
assert not "err" in track_info_result.keys()
|
if "err" in track_info_result:
|
||||||
|
raise LastFMException("Unknown error occurred: %s",
|
||||||
|
track_info_result.get('errorText', '??'))
|
||||||
return {
|
return {
|
||||||
'success': True,
|
'success': True,
|
||||||
'result': track_info_result
|
'result': track_info_result
|
||||||
|
@ -7,27 +7,29 @@ import urllib.parse
|
|||||||
import regex
|
import regex
|
||||||
import aiosqlite as sqlite3
|
import aiosqlite as sqlite3
|
||||||
from fastapi import FastAPI, HTTPException
|
from fastapi import FastAPI, HTTPException
|
||||||
|
from typing import LiteralString, Optional, Pattern
|
||||||
from .constructors import ValidTypeAheadRequest, ValidLyricRequest
|
from .constructors import ValidTypeAheadRequest, ValidLyricRequest
|
||||||
|
from lyric_search.constructors import LyricsResult
|
||||||
from lyric_search.sources import aggregate
|
from lyric_search.sources import aggregate
|
||||||
from lyric_search import notifier
|
from lyric_search import notifier
|
||||||
|
|
||||||
class CacheUtils:
|
class CacheUtils:
|
||||||
"""Lyrics Cache DB Utils"""
|
"""Lyrics Cache DB Utils"""
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
self.lyrics_db_path = os.path.join("/", "usr", "local", "share",
|
self.lyrics_db_path: LiteralString = os.path.join("/", "usr", "local", "share",
|
||||||
"sqlite_dbs", "cached_lyrics.db")
|
"sqlite_dbs", "cached_lyrics.db")
|
||||||
|
|
||||||
async def check_typeahead(self, s: str, pre_query: str | None = None):
|
async def check_typeahead(self, s: str, pre_query: str | None = None) -> Optional[list[dict]]:
|
||||||
"""Check s against artists stored - for typeahead"""
|
"""Check s against artists stored - for typeahead"""
|
||||||
async with sqlite3.connect(self.lyrics_db_path,
|
async with sqlite3.connect(self.lyrics_db_path,
|
||||||
timeout=2) as db_conn:
|
timeout=2) as db_conn:
|
||||||
db_conn.row_factory = lambda c, r: dict([(col[0], r[idx]) for idx, col in enumerate(c.description)])
|
db_conn.row_factory = lambda c, r: dict([(col[0], r[idx]) for idx, col in enumerate(c.description)])
|
||||||
if not pre_query:
|
if not pre_query:
|
||||||
query = "SELECT distinct(artist) FROM lyrics WHERE artist LIKE ? LIMIT 15"
|
query: str = "SELECT distinct(artist) FROM lyrics WHERE artist LIKE ? LIMIT 15"
|
||||||
query_params = (f"%{s}%",)
|
query_params: tuple = (f"%{s}%",)
|
||||||
else:
|
else:
|
||||||
query = "SELECT distinct(song) FROM lyrics WHERE artist LIKE ? AND song LIKE ? LIMIT 15"
|
query: str = "SELECT distinct(song) FROM lyrics WHERE artist LIKE ? AND song LIKE ? LIMIT 15"
|
||||||
query_params = (f"%{pre_query}%", f"%{s}%",)
|
query_params: tuple = (f"%{pre_query}%", f"%{s}%",)
|
||||||
async with await db_conn.execute(query, query_params) as db_cursor:
|
async with await db_conn.execute(query, query_params) as db_cursor:
|
||||||
return await db_cursor.fetchall()
|
return await db_cursor.fetchall()
|
||||||
|
|
||||||
@ -43,14 +45,14 @@ class LyricSearch(FastAPI):
|
|||||||
self.notifier = notifier.DiscordNotifier()
|
self.notifier = notifier.DiscordNotifier()
|
||||||
|
|
||||||
|
|
||||||
self.endpoints = {
|
self.endpoints: dict = {
|
||||||
"typeahead/artist": self.artist_typeahead_handler,
|
"typeahead/artist": self.artist_typeahead_handler,
|
||||||
"typeahead/song": self.song_typeahead_handler,
|
"typeahead/song": self.song_typeahead_handler,
|
||||||
"lyric_search": self.lyric_search_handler, # Preserving old endpoint path temporarily
|
"lyric_search": self.lyric_search_handler, # Preserving old endpoint path temporarily
|
||||||
"lyric/search": self.lyric_search_handler,
|
"lyric/search": self.lyric_search_handler,
|
||||||
}
|
}
|
||||||
|
|
||||||
self.acceptable_request_sources = [
|
self.acceptable_request_sources: list = [
|
||||||
"WEB",
|
"WEB",
|
||||||
"WEB-RADIO",
|
"WEB-RADIO",
|
||||||
"IRC-MS",
|
"IRC-MS",
|
||||||
@ -62,25 +64,25 @@ class LyricSearch(FastAPI):
|
|||||||
"LIMNORIA-SHARED",
|
"LIMNORIA-SHARED",
|
||||||
]
|
]
|
||||||
|
|
||||||
self.lrc_regex = regex.compile(r'\[([0-9]{2}:[0-9]{2})\.[0-9]{1,3}\](\s(.*)){0,}')
|
self.lrc_regex: Pattern = regex.compile(r'\[([0-9]{2}:[0-9]{2})\.[0-9]{1,3}\](\s(.*)){0,}')
|
||||||
|
|
||||||
for endpoint, handler in self.endpoints.items():
|
for endpoint, handler in self.endpoints.items():
|
||||||
_schema_include = endpoint in ["lyric/search"]
|
_schema_include = endpoint in ["lyric/search"]
|
||||||
app.add_api_route(f"/{endpoint}", handler, methods=["POST"], include_in_schema=_schema_include)
|
app.add_api_route(f"/{endpoint}", handler, methods=["POST"], include_in_schema=_schema_include)
|
||||||
|
|
||||||
async def artist_typeahead_handler(self, data: ValidTypeAheadRequest):
|
async def artist_typeahead_handler(self, data: ValidTypeAheadRequest) -> list[str]|dict:
|
||||||
"""Artist Type Ahead Handler"""
|
"""Artist Type Ahead Handler"""
|
||||||
if not isinstance(data.query, str) or len(data.query) < 2:
|
if not isinstance(data.query, str) or len(data.query) < 2:
|
||||||
return {
|
return {
|
||||||
'err': True,
|
'err': True,
|
||||||
'errorText': 'Invalid request',
|
'errorText': 'Invalid request',
|
||||||
}
|
}
|
||||||
query = data.query
|
query: str = data.query
|
||||||
typeahead_result = await self.cache_utils.check_typeahead(query)
|
typeahead_result: Optional[list[dict]] = await self.cache_utils.check_typeahead(query)
|
||||||
typeahead_list = [str(r.get('artist')) for r in typeahead_result]
|
typeahead_list: list[str] = [str(r.get('artist')) for r in typeahead_result]
|
||||||
return typeahead_list
|
return typeahead_list
|
||||||
|
|
||||||
async def song_typeahead_handler(self, data: ValidTypeAheadRequest):
|
async def song_typeahead_handler(self, data: ValidTypeAheadRequest) -> list[str]|dict:
|
||||||
"""Song Type Ahead Handler"""
|
"""Song Type Ahead Handler"""
|
||||||
if not isinstance(data.pre_query, str)\
|
if not isinstance(data.pre_query, str)\
|
||||||
or not isinstance(data.query, str|None):
|
or not isinstance(data.query, str|None):
|
||||||
@ -88,13 +90,13 @@ class LyricSearch(FastAPI):
|
|||||||
'err': True,
|
'err': True,
|
||||||
'errorText': 'Invalid request',
|
'errorText': 'Invalid request',
|
||||||
}
|
}
|
||||||
pre_query = data.pre_query
|
pre_query: str = data.pre_query
|
||||||
query = data.query
|
query: str = data.query
|
||||||
typeahead_result = await self.cache_utils.check_typeahead(query, pre_query)
|
typeahead_result: Optional[list[dict]] = await self.cache_utils.check_typeahead(query, pre_query)
|
||||||
typeahead_list = [str(r.get('song')) for r in typeahead_result]
|
typeahead_list: list[str] = [str(r.get('song')) for r in typeahead_result]
|
||||||
return typeahead_list
|
return typeahead_list
|
||||||
|
|
||||||
async def lyric_search_handler(self, data: ValidLyricRequest):
|
async def lyric_search_handler(self, data: ValidLyricRequest) -> dict:
|
||||||
"""
|
"""
|
||||||
Search for lyrics
|
Search for lyrics
|
||||||
|
|
||||||
@ -129,15 +131,15 @@ class LyricSearch(FastAPI):
|
|||||||
|
|
||||||
|
|
||||||
if search_artist and search_song:
|
if search_artist and search_song:
|
||||||
search_artist = self.constants.DOUBLE_SPACE_REGEX.sub(" ", search_artist.strip())
|
search_artist: str = self.constants.DOUBLE_SPACE_REGEX.sub(" ", search_artist.strip())
|
||||||
search_song = self.constants.DOUBLE_SPACE_REGEX.sub(" ", search_song.strip())
|
search_song: str = self.constants.DOUBLE_SPACE_REGEX.sub(" ", search_song.strip())
|
||||||
search_artist = urllib.parse.unquote(search_artist)
|
search_artist: str = urllib.parse.unquote(search_artist)
|
||||||
search_song = urllib.parse.unquote(search_song)
|
search_song: str = urllib.parse.unquote(search_song)
|
||||||
|
|
||||||
excluded_sources = data.excluded_sources
|
excluded_sources: list = data.excluded_sources
|
||||||
aggregate_search = aggregate.Aggregate(exclude_methods=excluded_sources)
|
aggregate_search = aggregate.Aggregate(exclude_methods=excluded_sources)
|
||||||
plain_lyrics = not data.lrc
|
plain_lyrics: bool = not data.lrc
|
||||||
result = await aggregate_search.search(search_artist, search_song, plain_lyrics)
|
result: Optional[LyricsResult] = await aggregate_search.search(search_artist, search_song, plain_lyrics)
|
||||||
|
|
||||||
if not result:
|
if not result:
|
||||||
return {
|
return {
|
||||||
@ -145,15 +147,15 @@ class LyricSearch(FastAPI):
|
|||||||
'errorText': 'Sources exhausted, lyrics not located.',
|
'errorText': 'Sources exhausted, lyrics not located.',
|
||||||
}
|
}
|
||||||
|
|
||||||
result = result.todict()
|
result: dict = result.todict()
|
||||||
|
|
||||||
if data.sub and not data.lrc:
|
if data.sub and not data.lrc:
|
||||||
seeked_found_line = None
|
seeked_found_line: Optional[int] = None
|
||||||
lyric_lines = result['lyrics'].strip().split(" / ")
|
lyric_lines: list[str] = result['lyrics'].strip().split(" / ")
|
||||||
for i, line in enumerate(lyric_lines):
|
for i, line in enumerate(lyric_lines):
|
||||||
line = regex.sub(r'\u2064', '', line.strip())
|
line: str = regex.sub(r'\u2064', '', line.strip())
|
||||||
if data.sub.strip().lower() in line.strip().lower():
|
if data.sub.strip().lower() in line.strip().lower():
|
||||||
seeked_found_line = i
|
seeked_found_line: int = i
|
||||||
logging.debug("Found %s at %s, match for %s!",
|
logging.debug("Found %s at %s, match for %s!",
|
||||||
line, seeked_found_line, data.sub) # REMOVEME: DEBUG
|
line, seeked_found_line, data.sub) # REMOVEME: DEBUG
|
||||||
break
|
break
|
||||||
|
@ -2,6 +2,7 @@
|
|||||||
# pylint: disable=bare-except, broad-exception-caught, invalid-name
|
# pylint: disable=bare-except, broad-exception-caught, invalid-name
|
||||||
|
|
||||||
import time
|
import time
|
||||||
|
from typing import Optional
|
||||||
from fastapi import FastAPI
|
from fastapi import FastAPI
|
||||||
import redis.asyncio as redis
|
import redis.asyncio as redis
|
||||||
from lyric_search.sources import private, cache as LyricsCache, redis_cache
|
from lyric_search.sources import private, cache as LyricsCache, redis_cache
|
||||||
@ -18,7 +19,7 @@ class Misc(FastAPI):
|
|||||||
self.redis_cache = redis_cache.RedisCache()
|
self.redis_cache = redis_cache.RedisCache()
|
||||||
self.redis_client = redis.Redis(password=private.REDIS_PW)
|
self.redis_client = redis.Redis(password=private.REDIS_PW)
|
||||||
self.radio = radio
|
self.radio = radio
|
||||||
self.endpoints = {
|
self.endpoints: dict = {
|
||||||
"widget/redis": self.homepage_redis_widget,
|
"widget/redis": self.homepage_redis_widget,
|
||||||
"widget/sqlite": self.homepage_sqlite_widget,
|
"widget/sqlite": self.homepage_sqlite_widget,
|
||||||
"widget/lyrics": self.homepage_lyrics_widget,
|
"widget/lyrics": self.homepage_lyrics_widget,
|
||||||
@ -29,54 +30,35 @@ class Misc(FastAPI):
|
|||||||
app.add_api_route(f"/{endpoint}", handler, methods=["GET"],
|
app.add_api_route(f"/{endpoint}", handler, methods=["GET"],
|
||||||
include_in_schema=False)
|
include_in_schema=False)
|
||||||
|
|
||||||
async def get_radio_np(self) -> dict:
|
async def get_radio_np(self) -> str:
|
||||||
"""
|
"""
|
||||||
Get radio now playing
|
Get radio now playing
|
||||||
Uses XC endpoint
|
|
||||||
Args:
|
Args:
|
||||||
None
|
None
|
||||||
Returns:
|
Returns:
|
||||||
str: Radio now playing in artist - song format
|
str: Radio now playing in artist - song format
|
||||||
"""
|
"""
|
||||||
|
|
||||||
json_payload = {
|
artistsong: Optional[str] = self.radio.radio_util.now_playing['artistsong']
|
||||||
'bid': 0,
|
|
||||||
'cmd': 'radio_metadata',
|
|
||||||
'key': f'Bearer {self.radio_pubkey}',
|
|
||||||
}
|
|
||||||
|
|
||||||
headers = {
|
|
||||||
'content-type': 'application/json; charset=utf-8',
|
|
||||||
}
|
|
||||||
|
|
||||||
artistsong = self.radio.now_playing['artistsong']
|
|
||||||
if not isinstance(artistsong, str):
|
if not isinstance(artistsong, str):
|
||||||
return "N/A - N/A"
|
return "N/A - N/A"
|
||||||
return artistsong
|
return artistsong
|
||||||
|
|
||||||
|
|
||||||
async def homepage_redis_widget(self) -> dict:
|
async def homepage_redis_widget(self) -> dict:
|
||||||
"""
|
"""Homepage Redis Widget Handler"""
|
||||||
/widget/redis
|
|
||||||
Homepage Redis Widget Handler
|
|
||||||
Args:
|
|
||||||
None
|
|
||||||
Returns:
|
|
||||||
dict
|
|
||||||
"""
|
|
||||||
|
|
||||||
# Measure response time w/ test lyric search
|
# Measure response time w/ test lyric search
|
||||||
time_start: float = time.time() # Start time for response_time
|
time_start: float = time.time() # Start time for response_time
|
||||||
test_lyrics_result = await self.redis_client.ft().search("@artist: test @song: test")
|
test_lyrics_result = await self.redis_client.ft().search("@artist: test @song: test")
|
||||||
time_end: float = time.time()
|
time_end: float = time.time()
|
||||||
# End response time test
|
# End response time test
|
||||||
|
|
||||||
total_keys = await self.redis_client.dbsize()
|
total_keys = await self.redis_client.dbsize()
|
||||||
response_time: float = time_end - time_start
|
response_time: float = time_end - time_start
|
||||||
(_, ci_keys) = await self.redis_client.scan(cursor=0, match="ci_session*", count=10000000)
|
(_, ci_keys) = await self.redis_client.scan(cursor=0, match="ci_session*", count=10000000)
|
||||||
num_ci_keys = len(ci_keys)
|
num_ci_keys = len(ci_keys)
|
||||||
index_info = await self.redis_client.ft().info()
|
index_info = await self.redis_client.ft().info()
|
||||||
indexed_lyrics = index_info.get('num_docs')
|
indexed_lyrics: int = index_info.get('num_docs')
|
||||||
return {
|
return {
|
||||||
'responseTime': round(response_time, 7),
|
'responseTime': round(response_time, 7),
|
||||||
'storedKeys': total_keys,
|
'storedKeys': total_keys,
|
||||||
@ -85,17 +67,11 @@ class Misc(FastAPI):
|
|||||||
}
|
}
|
||||||
|
|
||||||
async def homepage_sqlite_widget(self) -> dict:
|
async def homepage_sqlite_widget(self) -> dict:
|
||||||
"""
|
"""Homepage SQLite Widget Handler"""
|
||||||
/widget/sqlite
|
|
||||||
Homepage SQLite Widget Handler
|
row_count: int = await self.lyr_cache.sqlite_rowcount()
|
||||||
Args:
|
distinct_artists: int = await self.lyr_cache.sqlite_distinct("artist")
|
||||||
None
|
lyrics_length: int = await self.lyr_cache.sqlite_lyrics_length()
|
||||||
Returns:
|
|
||||||
dict
|
|
||||||
"""
|
|
||||||
row_count = await self.lyr_cache.sqlite_rowcount()
|
|
||||||
distinct_artists = await self.lyr_cache.sqlite_distinct("artist")
|
|
||||||
lyrics_length = await self.lyr_cache.sqlite_lyrics_length()
|
|
||||||
return {
|
return {
|
||||||
'storedRows': row_count,
|
'storedRows': row_count,
|
||||||
'distinctArtists': distinct_artists,
|
'distinctArtists': distinct_artists,
|
||||||
@ -103,26 +79,12 @@ class Misc(FastAPI):
|
|||||||
}
|
}
|
||||||
|
|
||||||
async def homepage_lyrics_widget(self) -> dict:
|
async def homepage_lyrics_widget(self) -> dict:
|
||||||
"""
|
"""Homepage Lyrics Widget Handler"""
|
||||||
/widget/lyrics
|
|
||||||
Homepage Lyrics Widget Handler
|
return await self.redis_cache.get_found_counts()
|
||||||
Args:
|
|
||||||
None
|
|
||||||
Returns:
|
|
||||||
dict
|
|
||||||
"""
|
|
||||||
counts = await self.redis_cache.get_found_counts()
|
|
||||||
return counts
|
|
||||||
|
|
||||||
async def homepage_radio_widget(self) -> dict:
|
async def homepage_radio_widget(self) -> dict:
|
||||||
"""
|
"""Homepage Radio Widget Handler"""
|
||||||
/widget/radio
|
|
||||||
Homepage Radio Widget Handler
|
|
||||||
Args:
|
|
||||||
None
|
|
||||||
Returns:
|
|
||||||
dict
|
|
||||||
"""
|
|
||||||
|
|
||||||
return {
|
return {
|
||||||
'now_playing': await self.get_radio_np(),
|
'now_playing': await self.get_radio_np(),
|
||||||
|
@ -13,18 +13,12 @@ from . import radio_util
|
|||||||
from .constructors import ValidRadioNextRequest, ValidRadioReshuffleRequest, ValidRadioQueueShiftRequest,\
|
from .constructors import ValidRadioNextRequest, ValidRadioReshuffleRequest, ValidRadioQueueShiftRequest,\
|
||||||
ValidRadioQueueRemovalRequest, ValidRadioSongRequest, RadioException
|
ValidRadioQueueRemovalRequest, ValidRadioSongRequest, RadioException
|
||||||
from uuid import uuid4 as uuid
|
from uuid import uuid4 as uuid
|
||||||
from typing import Optional
|
from typing import Optional, LiteralString
|
||||||
from fastapi import FastAPI, BackgroundTasks, Request, Response, HTTPException
|
from fastapi import FastAPI, BackgroundTasks, Request, Response, HTTPException
|
||||||
from fastapi.responses import RedirectResponse
|
from fastapi.responses import RedirectResponse
|
||||||
from aiohttp import ClientSession, ClientTimeout
|
from aiohttp import ClientSession, ClientTimeout
|
||||||
# pylint: disable=bare-except, broad-exception-caught, invalid-name
|
# pylint: disable=bare-except, broad-exception-caught, invalid-name
|
||||||
|
|
||||||
double_space = regex.compile(r'\s{2,}')
|
|
||||||
|
|
||||||
"""
|
|
||||||
TODO:
|
|
||||||
minor refactoring/type annotations/docstrings
|
|
||||||
"""
|
|
||||||
|
|
||||||
class Radio(FastAPI):
|
class Radio(FastAPI):
|
||||||
"""Radio Endpoints"""
|
"""Radio Endpoints"""
|
||||||
@ -34,24 +28,8 @@ class Radio(FastAPI):
|
|||||||
self.constants = constants
|
self.constants = constants
|
||||||
self.radio_util = radio_util.RadioUtil(self.constants)
|
self.radio_util = radio_util.RadioUtil(self.constants)
|
||||||
self.glob_state = glob_state
|
self.glob_state = glob_state
|
||||||
self.ls_uri = "http://10.10.10.101:29000"
|
|
||||||
self.sqlite_exts: list[str] = ['/home/singer/api/solibs/spellfix1.cpython-311-x86_64-linux-gnu.so']
|
self.endpoints: dict = {
|
||||||
self.active_playlist_path = os.path.join("/usr/local/share",
|
|
||||||
"sqlite_dbs", "track_file_map.db")
|
|
||||||
self.active_playlist_name = "default" # not used
|
|
||||||
self.active_playlist = []
|
|
||||||
self.now_playing = {
|
|
||||||
'artist': 'N/A',
|
|
||||||
'song': 'N/A',
|
|
||||||
'genre': 'N/A',
|
|
||||||
'artistsong': 'N/A - N/A',
|
|
||||||
'duration': 0,
|
|
||||||
'start': 0,
|
|
||||||
'end': 0,
|
|
||||||
'file_path': None,
|
|
||||||
'id': None,
|
|
||||||
}
|
|
||||||
self.endpoints = {
|
|
||||||
"radio/np": self.radio_now_playing,
|
"radio/np": self.radio_now_playing,
|
||||||
"radio/request": self.radio_request,
|
"radio/request": self.radio_request,
|
||||||
"radio/get_queue": self.radio_get_queue,
|
"radio/get_queue": self.radio_get_queue,
|
||||||
@ -69,30 +47,8 @@ class Radio(FastAPI):
|
|||||||
# NOTE: Not in loop because method is GET for this endpoint
|
# NOTE: Not in loop because method is GET for this endpoint
|
||||||
app.add_api_route("/radio/album_art", self.album_art_handler, methods=["GET"],
|
app.add_api_route("/radio/album_art", self.album_art_handler, methods=["GET"],
|
||||||
include_in_schema=True)
|
include_in_schema=True)
|
||||||
asyncio.get_event_loop().run_until_complete(self.load_playlist())
|
asyncio.get_event_loop().run_until_complete(self.radio_util.load_playlist())
|
||||||
asyncio.get_event_loop().run_until_complete(self._ls_skip())
|
asyncio.get_event_loop().run_until_complete(self.radio_util._ls_skip())
|
||||||
|
|
||||||
|
|
||||||
def get_queue_item_by_uuid(self, uuid: str) -> tuple[int, dict] | None:
|
|
||||||
"""
|
|
||||||
Get queue item by UUID
|
|
||||||
Args:
|
|
||||||
uuid: The UUID to search
|
|
||||||
Returns:
|
|
||||||
dict|None
|
|
||||||
"""
|
|
||||||
for x, item in enumerate(self.active_playlist):
|
|
||||||
if item.get('uuid') == uuid:
|
|
||||||
return (x, item)
|
|
||||||
return None
|
|
||||||
|
|
||||||
async def _ls_skip(self) -> bool:
|
|
||||||
async with ClientSession() as session:
|
|
||||||
async with session.get(f"{self.ls_uri}/next",
|
|
||||||
timeout=ClientTimeout(connect=2, sock_read=2)) as request:
|
|
||||||
request.raise_for_status()
|
|
||||||
text = await request.text()
|
|
||||||
return text == "OK"
|
|
||||||
|
|
||||||
async def radio_skip(self, data: ValidRadioNextRequest, request: Request) -> bool:
|
async def radio_skip(self, data: ValidRadioNextRequest, request: Request) -> bool:
|
||||||
"""
|
"""
|
||||||
@ -102,11 +58,11 @@ class Radio(FastAPI):
|
|||||||
if not self.util.check_key(path=request.url.path, req_type=4, key=data.key):
|
if not self.util.check_key(path=request.url.path, req_type=4, key=data.key):
|
||||||
raise HTTPException(status_code=403, detail="Unauthorized")
|
raise HTTPException(status_code=403, detail="Unauthorized")
|
||||||
if data.skipTo:
|
if data.skipTo:
|
||||||
(x, _) = self.get_queue_item_by_uuid(data.skipTo)
|
(x, _) = self.radio_util.get_queue_item_by_uuid(data.skipTo)
|
||||||
self.active_playlist = self.active_playlist[x:]
|
self.radio_util.active_playlist: list = self.radio_util.active_playlist[x:]
|
||||||
if not self.active_playlist:
|
if not self.radio_util.active_playlist:
|
||||||
await self.load_playlist()
|
await self.radio_util.load_playlist()
|
||||||
return await self._ls_skip()
|
return await self.radio_util._ls_skip()
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
traceback.print_exc()
|
traceback.print_exc()
|
||||||
return False
|
return False
|
||||||
@ -119,7 +75,7 @@ class Radio(FastAPI):
|
|||||||
if not self.util.check_key(path=request.url.path, req_type=4, key=data.key):
|
if not self.util.check_key(path=request.url.path, req_type=4, key=data.key):
|
||||||
raise HTTPException(status_code=403, detail="Unauthorized")
|
raise HTTPException(status_code=403, detail="Unauthorized")
|
||||||
|
|
||||||
random.shuffle(self.active_playlist)
|
random.shuffle(self.radio_util.active_playlist)
|
||||||
return {
|
return {
|
||||||
'ok': True
|
'ok': True
|
||||||
}
|
}
|
||||||
@ -128,13 +84,9 @@ class Radio(FastAPI):
|
|||||||
async def radio_get_queue(self, request: Request, limit: int = 20_000) -> dict:
|
async def radio_get_queue(self, request: Request, limit: int = 20_000) -> dict:
|
||||||
"""
|
"""
|
||||||
Get current play queue, up to limit n [default: 20k]
|
Get current play queue, up to limit n [default: 20k]
|
||||||
Args:
|
|
||||||
limit (int): Number of results to return (default 20k)
|
|
||||||
Returns:
|
|
||||||
dict
|
|
||||||
"""
|
"""
|
||||||
queue_out = []
|
queue_out = []
|
||||||
for x, item in enumerate(self.active_playlist[0:limit+1]):
|
for x, item in enumerate(self.radio_util.active_playlist[0:limit+1]):
|
||||||
queue_out.append({
|
queue_out.append({
|
||||||
'pos': x,
|
'pos': x,
|
||||||
'id': item.get('id'),
|
'id': item.get('id'),
|
||||||
@ -153,11 +105,11 @@ class Radio(FastAPI):
|
|||||||
if not self.util.check_key(path=request.url.path, req_type=4, key=data.key):
|
if not self.util.check_key(path=request.url.path, req_type=4, key=data.key):
|
||||||
raise HTTPException(status_code=403, detail="Unauthorized")
|
raise HTTPException(status_code=403, detail="Unauthorized")
|
||||||
|
|
||||||
(x, item) = self.get_queue_item_by_uuid(data.uuid)
|
(x, item) = self.radio_util.get_queue_item_by_uuid(data.uuid)
|
||||||
self.active_playlist.pop(x)
|
self.radio_util.active_playlist.pop(x)
|
||||||
self.active_playlist.insert(0, item)
|
self.radio_util.active_playlist.insert(0, item)
|
||||||
if not data.next:
|
if not data.next:
|
||||||
await self._ls_skip()
|
await self.radio_util._ls_skip()
|
||||||
return {
|
return {
|
||||||
'ok': True,
|
'ok': True,
|
||||||
}
|
}
|
||||||
@ -167,158 +119,25 @@ class Radio(FastAPI):
|
|||||||
if not self.util.check_key(path=request.url.path, req_type=4, key=data.key):
|
if not self.util.check_key(path=request.url.path, req_type=4, key=data.key):
|
||||||
raise HTTPException(status_code=403, detail="Unauthorized")
|
raise HTTPException(status_code=403, detail="Unauthorized")
|
||||||
|
|
||||||
(x, found_item) = self.get_queue_item_by_uuid(data.uuid)
|
(x, found_item) = self.radio_util.get_queue_item_by_uuid(data.uuid)
|
||||||
if not found_item:
|
if not found_item:
|
||||||
return {
|
return {
|
||||||
'ok': False,
|
'ok': False,
|
||||||
'err': 'UUID not found in play queue',
|
'err': 'UUID not found in play queue',
|
||||||
}
|
}
|
||||||
self.active_playlist.pop(x)
|
self.radio_util.active_playlist.pop(x)
|
||||||
return {
|
return {
|
||||||
'ok': True,
|
'ok': True,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
async def search_playlist(self, artistsong: str|None = None, artist: str|None = None, song: str|None = None) -> bool:
|
|
||||||
if artistsong and (artist or song):
|
|
||||||
raise RadioException("Cannot search using combination provided")
|
|
||||||
if not artistsong and (not artist or not song):
|
|
||||||
raise RadioException("No query provided")
|
|
||||||
try:
|
|
||||||
search_artist = None
|
|
||||||
search_song = None
|
|
||||||
search_query = 'SELECT id, artist, song, (artist || " - " || song) AS artistsong, genre, file_path, duration FROM tracks\
|
|
||||||
WHERE editdist3((lower(artist) || " " || lower(song)), (? || " " || ?))\
|
|
||||||
<= 410 ORDER BY editdist3((lower(artist) || " " || lower(song)), ?) ASC LIMIT 1'
|
|
||||||
if artistsong:
|
|
||||||
artistsong_split = artistsong.split(" - ", maxsplit=1)
|
|
||||||
(search_artist, search_song) = tuple(artistsong_split)
|
|
||||||
else:
|
|
||||||
search_artist = artist
|
|
||||||
search_song = song
|
|
||||||
if not artistsong:
|
|
||||||
artistsong = f"{search_artist} - {search_song}"
|
|
||||||
search_params = (search_artist.lower(), search_song.lower(), artistsong.lower(),)
|
|
||||||
async with sqlite3.connect(self.active_playlist_path,
|
|
||||||
timeout=2) as db_conn:
|
|
||||||
await db_conn.enable_load_extension(True)
|
|
||||||
for ext in self.sqlite_exts:
|
|
||||||
await db_conn.load_extension(ext)
|
|
||||||
db_conn.row_factory = sqlite3.Row
|
|
||||||
async with await db_conn.execute(search_query, search_params) as db_cursor:
|
|
||||||
result = await db_cursor.fetchone()
|
|
||||||
if not result:
|
|
||||||
return False
|
|
||||||
pushObj = {
|
|
||||||
'id': result['id'],
|
|
||||||
'uuid': str(uuid().hex),
|
|
||||||
'artist': result['artist'].strip(),
|
|
||||||
'song': result['song'].strip(),
|
|
||||||
'artistsong': result['artistsong'].strip(),
|
|
||||||
'genre': result['genre'],
|
|
||||||
'file_path': result['file_path'],
|
|
||||||
'duration': result['duration'],
|
|
||||||
}
|
|
||||||
self.active_playlist.insert(0, pushObj)
|
|
||||||
return True
|
|
||||||
except Exception as e:
|
|
||||||
logging.critical("search_playlist:: Search error occurred: %s", str(e))
|
|
||||||
traceback.print_exc()
|
|
||||||
return False
|
|
||||||
|
|
||||||
async def load_playlist(self):
|
|
||||||
try:
|
|
||||||
logging.info(f"Loading playlist...")
|
|
||||||
self.active_playlist.clear()
|
|
||||||
# db_query = 'SELECT distinct(artist || " - " || song) AS artistdashsong, id, artist, song, genre, file_path, duration FROM tracks\
|
|
||||||
# GROUP BY artistdashsong ORDER BY RANDOM()'
|
|
||||||
|
|
||||||
"""
|
|
||||||
LIMITED GENRES
|
|
||||||
"""
|
|
||||||
|
|
||||||
db_query = 'SELECT distinct(artist || " - " || song) AS artistdashsong, id, artist, song, genre, file_path, duration FROM tracks\
|
|
||||||
WHERE genre IN ("metalcore", "pop punk", "punk rock", "metal", "punk", "electronic", "nu metal", "EDM",\
|
|
||||||
"post-hardcore", "pop rock", "experimental", "post-punk", "death metal", "electronicore", "hard rock", "psychedelic rock",\
|
|
||||||
"grunge", "house", "dubstep", "hardcore", "hair metal", "horror punk", "folk punk", "breakcore",\
|
|
||||||
"post-rock", "deathcore", "hardcore punk", "synthwave", "trap") GROUP BY artistdashsong ORDER BY RANDOM()'
|
|
||||||
|
|
||||||
async with sqlite3.connect(self.active_playlist_path,
|
|
||||||
timeout=2) as db_conn:
|
|
||||||
db_conn.row_factory = sqlite3.Row
|
|
||||||
async with await db_conn.execute(db_query) as db_cursor:
|
|
||||||
results = await db_cursor.fetchall()
|
|
||||||
self.active_playlist = [{
|
|
||||||
'uuid': str(uuid().hex),
|
|
||||||
'id': r['id'],
|
|
||||||
'artist': double_space.sub(' ', r['artist']).strip(),
|
|
||||||
'song': double_space.sub(' ', r['song']).strip(),
|
|
||||||
'genre': r['genre'] if r['genre'] else 'Unknown',
|
|
||||||
'artistsong': double_space.sub(' ', r['artistdashsong']).strip(),
|
|
||||||
'file_path': r['file_path'],
|
|
||||||
'duration': r['duration'],
|
|
||||||
} for r in results]
|
|
||||||
logging.info("Populated active playlists with %s items",
|
|
||||||
len(self.active_playlist))
|
|
||||||
except:
|
|
||||||
traceback.print_exc()
|
|
||||||
|
|
||||||
async def cache_album_art(self, track_id: int, album_art: bytes) -> None:
|
|
||||||
try:
|
|
||||||
async with sqlite3.connect(self.active_playlist_path,
|
|
||||||
timeout=2) as db_conn:
|
|
||||||
async with await db_conn.execute("UPDATE tracks SET album_art = ? WHERE id = ?",
|
|
||||||
(album_art, track_id,)) as db_cursor:
|
|
||||||
await db_conn.commit()
|
|
||||||
except:
|
|
||||||
traceback.print_exc()
|
|
||||||
|
|
||||||
async def get_album_art(self, track_id: Optional[int] = None,
|
|
||||||
file_path: Optional[str] = None) -> bytes:
|
|
||||||
try:
|
|
||||||
async with sqlite3.connect(self.active_playlist_path,
|
|
||||||
timeout=2) as db_conn:
|
|
||||||
db_conn.row_factory = sqlite3.Row
|
|
||||||
query = "SELECT album_art FROM tracks WHERE id = ?"
|
|
||||||
query_params = (track_id,)
|
|
||||||
|
|
||||||
if file_path and not track_id:
|
|
||||||
query = "SELECT album_art FROM tracks WHERE file_path = ?"
|
|
||||||
query_params = (file_path,)
|
|
||||||
|
|
||||||
async with await db_conn.execute(query,
|
|
||||||
query_params) as db_cursor:
|
|
||||||
result = await db_cursor.fetchone()
|
|
||||||
if not result:
|
|
||||||
return
|
|
||||||
return result['album_art']
|
|
||||||
except:
|
|
||||||
traceback.print_exc()
|
|
||||||
return
|
|
||||||
|
|
||||||
async def _get_album_art(self, track_id: Optional[int] = None, file_path: Optional[str] = None) -> bytes|None:
|
|
||||||
try:
|
|
||||||
if not file_path:
|
|
||||||
file_path = self.now_playing.get('file_path')
|
|
||||||
|
|
||||||
if not file_path:
|
|
||||||
logging.critical("_get_album_art:: No current file")
|
|
||||||
return
|
|
||||||
original_file_path = file_path
|
|
||||||
file_path = file_path.replace("/paul/toons/",
|
|
||||||
"/singer/gogs_toons/")
|
|
||||||
cached_album_art = await self.get_album_art(file_path=original_file_path,
|
|
||||||
track_id=track_id)
|
|
||||||
if cached_album_art:
|
|
||||||
return cached_album_art
|
|
||||||
except:
|
|
||||||
traceback.print_exc()
|
|
||||||
|
|
||||||
# TODO: Optimize/cache
|
|
||||||
async def album_art_handler(self, request: Request, track_id: Optional[int] = None) -> bytes:
|
async def album_art_handler(self, request: Request, track_id: Optional[int] = None) -> bytes:
|
||||||
|
"""
|
||||||
|
Get album art, optional parameter track_id may be specified.
|
||||||
|
Otherwise, current track album art will be pulled.
|
||||||
|
"""
|
||||||
try:
|
try:
|
||||||
logging.debug("Seeking album art with trackId: %s", track_id)
|
logging.debug("Seeking album art with trackId: %s", track_id)
|
||||||
album_art = await self._get_album_art(track_id=track_id)
|
album_art: Optional[bytes] = await self.radio_util._get_album_art(track_id=track_id)
|
||||||
if not album_art:
|
if not album_art:
|
||||||
return RedirectResponse(url="https://codey.lol/images/radio_art_default.jpg",
|
return RedirectResponse(url="https://codey.lol/images/radio_art_default.jpg",
|
||||||
status_code=302)
|
status_code=302)
|
||||||
@ -330,16 +149,14 @@ class Radio(FastAPI):
|
|||||||
status_code=302)
|
status_code=302)
|
||||||
|
|
||||||
async def radio_now_playing(self, request: Request) -> dict:
|
async def radio_now_playing(self, request: Request) -> dict:
|
||||||
ret_obj = {**self.now_playing}
|
ret_obj: dict = {**self.radio_util.now_playing}
|
||||||
cur_elapsed = self.now_playing.get('elapsed', -1)
|
cur_elapsed: int = self.radio_util.now_playing.get('elapsed', -1)
|
||||||
cur_duration = self.now_playing.get('duration', 999999)
|
cur_duration: int = self.radio_util.now_playing.get('duration', 999999)
|
||||||
try:
|
try:
|
||||||
ret_obj['elapsed'] = int(time.time()) - ret_obj['start']
|
ret_obj['elapsed'] = int(time.time()) - ret_obj['start']
|
||||||
except KeyError:
|
except KeyError:
|
||||||
traceback.print_exc()
|
traceback.print_exc()
|
||||||
ret_obj['elapsed'] = 0
|
ret_obj['elapsed'] = 0
|
||||||
elapsed = ret_obj['elapsed']
|
|
||||||
duration = ret_obj['duration']
|
|
||||||
ret_obj.pop('file_path')
|
ret_obj.pop('file_path')
|
||||||
return ret_obj
|
return ret_obj
|
||||||
|
|
||||||
@ -348,36 +165,31 @@ class Radio(FastAPI):
|
|||||||
background_tasks: BackgroundTasks) -> Optional[dict]:
|
background_tasks: BackgroundTasks) -> Optional[dict]:
|
||||||
"""
|
"""
|
||||||
Get next track
|
Get next track
|
||||||
Args:
|
Track will be removed from the queue in the process.
|
||||||
None
|
|
||||||
Returns:
|
|
||||||
str: Next track in queue
|
|
||||||
|
|
||||||
Track will be removed from the queue in the process (pop from top of list).
|
|
||||||
"""
|
"""
|
||||||
if not self.util.check_key(path=request.url.path, req_type=4, key=data.key):
|
if not self.util.check_key(path=request.url.path, req_type=4, key=data.key):
|
||||||
raise HTTPException(status_code=403, detail="Unauthorized")
|
raise HTTPException(status_code=403, detail="Unauthorized")
|
||||||
if not isinstance(self.active_playlist, list) or not self.active_playlist:
|
if not isinstance(self.radio_util.active_playlist, list) or not self.radio_util.active_playlist:
|
||||||
await self.load_playlist()
|
await self.radio_util.load_playlist()
|
||||||
await self._ls_skip()
|
await self.radio_util._ls_skip()
|
||||||
return
|
return
|
||||||
next = self.active_playlist.pop(0)
|
next = self.radio_util.active_playlist.pop(0)
|
||||||
if not isinstance(next, dict):
|
if not isinstance(next, dict):
|
||||||
logging.critical("next is of type: %s, reloading playlist...", type(next))
|
logging.critical("next is of type: %s, reloading playlist...", type(next))
|
||||||
await self.load_playlist()
|
await self.radio_util.load_playlist()
|
||||||
await self._ls_skip()
|
await self.radio_util._ls_skip()
|
||||||
return
|
return
|
||||||
|
|
||||||
duration = next['duration']
|
duration: int = next['duration']
|
||||||
time_started = int(time.time())
|
time_started: int = int(time.time())
|
||||||
time_ends = int(time_started + duration)
|
time_ends: int = int(time_started + duration)
|
||||||
|
|
||||||
if len(self.active_playlist) > 1:
|
if len(self.radio_util.active_playlist) > 1:
|
||||||
self.active_playlist.append(next) # Push to end of playlist
|
self.radio_util.active_playlist.append(next) # Push to end of playlist
|
||||||
else:
|
else:
|
||||||
await self.load_playlist()
|
await self.radio_util.load_playlist()
|
||||||
|
|
||||||
self.now_playing = next
|
self.radio_util.now_playing: dict = next
|
||||||
next['start'] = time_started
|
next['start'] = time_started
|
||||||
next['end'] = time_ends
|
next['end'] = time_ends
|
||||||
try:
|
try:
|
||||||
@ -385,9 +197,9 @@ class Radio(FastAPI):
|
|||||||
except Exception as e:
|
except Exception as e:
|
||||||
traceback.print_exc()
|
traceback.print_exc()
|
||||||
try:
|
try:
|
||||||
if not await self.get_album_art(file_path=next['file_path']):
|
if not await self.radio_util.get_album_art(file_path=next['file_path']):
|
||||||
album_art = await self._get_album_art(next['file_path'])
|
album_art = await self.radio_util._get_album_art(next['file_path'])
|
||||||
await self.cache_album_art(next['id'], album_art)
|
await self.radio_util.cache_album_art(next['id'], album_art)
|
||||||
except:
|
except:
|
||||||
traceback.print_exc()
|
traceback.print_exc()
|
||||||
return next
|
return next
|
||||||
@ -396,9 +208,9 @@ class Radio(FastAPI):
|
|||||||
async def radio_request(self, data: ValidRadioSongRequest, request: Request) -> Response:
|
async def radio_request(self, data: ValidRadioSongRequest, request: Request) -> Response:
|
||||||
if not self.util.check_key(path=request.url.path, req_type=4, key=data.key):
|
if not self.util.check_key(path=request.url.path, req_type=4, key=data.key):
|
||||||
raise HTTPException(status_code=403, detail="Unauthorized")
|
raise HTTPException(status_code=403, detail="Unauthorized")
|
||||||
artistsong = data.artistsong
|
artistsong: str = data.artistsong
|
||||||
artist = data.artist
|
artist: str = data.artist
|
||||||
song = data.song
|
song: str = data.song
|
||||||
if artistsong and (artist or song):
|
if artistsong and (artist or song):
|
||||||
return {
|
return {
|
||||||
'err': True,
|
'err': True,
|
||||||
@ -410,9 +222,11 @@ class Radio(FastAPI):
|
|||||||
'errorText': 'Invalid request',
|
'errorText': 'Invalid request',
|
||||||
}
|
}
|
||||||
|
|
||||||
search = await self.search_playlist(artistsong=artistsong,
|
search: bool = await self.radio_util.search_playlist(artistsong=artistsong,
|
||||||
artist=artist,
|
artist=artist,
|
||||||
song=song)
|
song=song)
|
||||||
if data.alsoSkip:
|
if data.alsoSkip:
|
||||||
await self._ls_skip()
|
await self.radio_util._ls_skip()
|
||||||
return {'result': search}
|
return {
|
||||||
|
'result': search
|
||||||
|
}
|
@ -7,15 +7,40 @@ Radio Utils
|
|||||||
import logging
|
import logging
|
||||||
import traceback
|
import traceback
|
||||||
import time
|
import time
|
||||||
|
import regex
|
||||||
import datetime
|
import datetime
|
||||||
|
import os
|
||||||
import gpt
|
import gpt
|
||||||
from aiohttp import ClientSession, ClientTimeout
|
from aiohttp import ClientSession, ClientTimeout
|
||||||
|
import aiosqlite as sqlite3
|
||||||
|
from typing import Optional, LiteralString
|
||||||
|
from uuid import uuid4 as uuid
|
||||||
|
from .constructors import RadioException
|
||||||
|
|
||||||
|
double_space = regex.compile(r'\s{2,}')
|
||||||
|
|
||||||
class RadioUtil:
|
class RadioUtil:
|
||||||
def __init__(self, constants) -> None:
|
def __init__(self, constants) -> None:
|
||||||
self.constants = constants
|
self.constants = constants
|
||||||
self.gpt = gpt.GPT(self.constants)
|
self.gpt = gpt.GPT(self.constants)
|
||||||
self.webhooks = {
|
self.ls_uri: str = "http://10.10.10.101:29000"
|
||||||
|
self.sqlite_exts: list[str] = ['/home/singer/api/solibs/spellfix1.cpython-311-x86_64-linux-gnu.so']
|
||||||
|
self.active_playlist_path: str|LiteralString = os.path.join("/usr/local/share",
|
||||||
|
"sqlite_dbs", "track_file_map.db")
|
||||||
|
self.active_playlist_name = "default" # not used
|
||||||
|
self.active_playlist: list = []
|
||||||
|
self.now_playing: dict = {
|
||||||
|
'artist': 'N/A',
|
||||||
|
'song': 'N/A',
|
||||||
|
'genre': 'N/A',
|
||||||
|
'artistsong': 'N/A - N/A',
|
||||||
|
'duration': 0,
|
||||||
|
'start': 0,
|
||||||
|
'end': 0,
|
||||||
|
'file_path': None,
|
||||||
|
'id': None,
|
||||||
|
}
|
||||||
|
self.webhooks: dict = {
|
||||||
'gpt': {
|
'gpt': {
|
||||||
'hook': self.constants.GPT_WEBHOOK,
|
'hook': self.constants.GPT_WEBHOOK,
|
||||||
},
|
},
|
||||||
@ -23,6 +48,7 @@ class RadioUtil:
|
|||||||
'hook': self.constants.SFM_WEBHOOK,
|
'hook': self.constants.SFM_WEBHOOK,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
def duration_conv(self, s: int|float) -> str:
|
def duration_conv(self, s: int|float) -> str:
|
||||||
"""
|
"""
|
||||||
Convert duration given in seconds to hours, minutes, and seconds (h:m:s)
|
Convert duration given in seconds to hours, minutes, and seconds (h:m:s)
|
||||||
@ -33,7 +59,168 @@ class RadioUtil:
|
|||||||
"""
|
"""
|
||||||
return str(datetime.timedelta(seconds=s)).split(".", maxsplit=1)[0]
|
return str(datetime.timedelta(seconds=s)).split(".", maxsplit=1)[0]
|
||||||
|
|
||||||
async def get_ai_song_info(self, artist: str, song: str) -> str|None:
|
async def search_playlist(self, artistsong: str|None = None, artist: str|None = None, song: str|None = None) -> bool:
|
||||||
|
if artistsong and (artist or song):
|
||||||
|
raise RadioException("Cannot search using combination provided")
|
||||||
|
if not artistsong and (not artist or not song):
|
||||||
|
raise RadioException("No query provided")
|
||||||
|
try:
|
||||||
|
search_artist: Optional[str] = None
|
||||||
|
search_song: Optional[str] = None
|
||||||
|
search_query: str = 'SELECT id, artist, song, (artist || " - " || song) AS artistsong, genre, file_path, duration FROM tracks\
|
||||||
|
WHERE editdist3((lower(artist) || " " || lower(song)), (? || " " || ?))\
|
||||||
|
<= 410 ORDER BY editdist3((lower(artist) || " " || lower(song)), ?) ASC LIMIT 1'
|
||||||
|
if artistsong:
|
||||||
|
artistsong_split: list = artistsong.split(" - ", maxsplit=1)
|
||||||
|
(search_artist, search_song) = tuple(artistsong_split)
|
||||||
|
else:
|
||||||
|
search_artist: str = artist
|
||||||
|
search_song: str = song
|
||||||
|
if not artistsong:
|
||||||
|
artistsong: str = f"{search_artist} - {search_song}"
|
||||||
|
search_params = (search_artist.lower(), search_song.lower(), artistsong.lower(),)
|
||||||
|
async with sqlite3.connect(self.active_playlist_path,
|
||||||
|
timeout=2) as db_conn:
|
||||||
|
await db_conn.enable_load_extension(True)
|
||||||
|
for ext in self.sqlite_exts:
|
||||||
|
await db_conn.load_extension(ext)
|
||||||
|
db_conn.row_factory = sqlite3.Row
|
||||||
|
async with await db_conn.execute(search_query, search_params) as db_cursor:
|
||||||
|
result: Optional[sqlite3.Row|bool] = await db_cursor.fetchone()
|
||||||
|
if not result:
|
||||||
|
return False
|
||||||
|
pushObj: dict = {
|
||||||
|
'id': result['id'],
|
||||||
|
'uuid': str(uuid().hex),
|
||||||
|
'artist': result['artist'].strip(),
|
||||||
|
'song': result['song'].strip(),
|
||||||
|
'artistsong': result['artistsong'].strip(),
|
||||||
|
'genre': result['genre'],
|
||||||
|
'file_path': result['file_path'],
|
||||||
|
'duration': result['duration'],
|
||||||
|
}
|
||||||
|
self.active_playlist.insert(0, pushObj)
|
||||||
|
return True
|
||||||
|
except Exception as e:
|
||||||
|
logging.critical("search_playlist:: Search error occurred: %s", str(e))
|
||||||
|
traceback.print_exc()
|
||||||
|
return False
|
||||||
|
|
||||||
|
async def load_playlist(self):
|
||||||
|
try:
|
||||||
|
logging.info(f"Loading playlist...")
|
||||||
|
self.active_playlist.clear()
|
||||||
|
# db_query = 'SELECT distinct(artist || " - " || song) AS artistdashsong, id, artist, song, genre, file_path, duration FROM tracks\
|
||||||
|
# GROUP BY artistdashsong ORDER BY RANDOM()'
|
||||||
|
|
||||||
|
"""
|
||||||
|
LIMITED GENRES
|
||||||
|
"""
|
||||||
|
|
||||||
|
db_query: str = 'SELECT distinct(artist || " - " || song) AS artistdashsong, id, artist, song, genre, file_path, duration FROM tracks\
|
||||||
|
WHERE genre IN ("metalcore", "pop punk", "punk rock", "metal", "punk", "electronic", "nu metal", "EDM",\
|
||||||
|
"post-hardcore", "pop rock", "experimental", "post-punk", "death metal", "electronicore", "hard rock", "psychedelic rock",\
|
||||||
|
"grunge", "house", "dubstep", "hardcore", "hair metal", "horror punk", "folk punk", "breakcore",\
|
||||||
|
"post-rock", "deathcore", "hardcore punk", "synthwave", "trap") GROUP BY artistdashsong ORDER BY RANDOM()'
|
||||||
|
|
||||||
|
async with sqlite3.connect(self.active_playlist_path,
|
||||||
|
timeout=2) as db_conn:
|
||||||
|
db_conn.row_factory = sqlite3.Row
|
||||||
|
async with await db_conn.execute(db_query) as db_cursor:
|
||||||
|
results: Optional[list[sqlite3.Row]] = await db_cursor.fetchall()
|
||||||
|
self.active_playlist: list[dict] = [{
|
||||||
|
'uuid': str(uuid().hex),
|
||||||
|
'id': r['id'],
|
||||||
|
'artist': double_space.sub(' ', r['artist']).strip(),
|
||||||
|
'song': double_space.sub(' ', r['song']).strip(),
|
||||||
|
'genre': r['genre'] if r['genre'] else 'Unknown',
|
||||||
|
'artistsong': double_space.sub(' ', r['artistdashsong']).strip(),
|
||||||
|
'file_path': r['file_path'],
|
||||||
|
'duration': r['duration'],
|
||||||
|
} for r in results]
|
||||||
|
logging.info("Populated active playlists with %s items",
|
||||||
|
len(self.active_playlist))
|
||||||
|
except:
|
||||||
|
traceback.print_exc()
|
||||||
|
|
||||||
|
async def cache_album_art(self, track_id: int, album_art: bytes) -> None:
|
||||||
|
try:
|
||||||
|
async with sqlite3.connect(self.active_playlist_path,
|
||||||
|
timeout=2) as db_conn:
|
||||||
|
async with await db_conn.execute("UPDATE tracks SET album_art = ? WHERE id = ?",
|
||||||
|
(album_art, track_id,)) as db_cursor:
|
||||||
|
await db_conn.commit()
|
||||||
|
except:
|
||||||
|
traceback.print_exc()
|
||||||
|
|
||||||
|
async def get_album_art(self, track_id: Optional[int] = None,
|
||||||
|
file_path: Optional[str] = None) -> bytes:
|
||||||
|
try:
|
||||||
|
async with sqlite3.connect(self.active_playlist_path,
|
||||||
|
timeout=2) as db_conn:
|
||||||
|
db_conn.row_factory = sqlite3.Row
|
||||||
|
query: str = "SELECT album_art FROM tracks WHERE id = ?"
|
||||||
|
query_params: tuple = (track_id,)
|
||||||
|
|
||||||
|
if file_path and not track_id:
|
||||||
|
query: str = "SELECT album_art FROM tracks WHERE file_path = ?"
|
||||||
|
query_params: tuple = (file_path,)
|
||||||
|
|
||||||
|
async with await db_conn.execute(query,
|
||||||
|
query_params) as db_cursor:
|
||||||
|
result: Optional[sqlite3.Row|bool] = await db_cursor.fetchone()
|
||||||
|
if not result:
|
||||||
|
return
|
||||||
|
return result['album_art']
|
||||||
|
except:
|
||||||
|
traceback.print_exc()
|
||||||
|
return
|
||||||
|
|
||||||
|
async def _get_album_art(self, track_id: Optional[int] = None, file_path: Optional[str] = None) -> Optional[bytes]:
|
||||||
|
try:
|
||||||
|
if not file_path:
|
||||||
|
file_path: Optional[str] = self.now_playing.get('file_path')
|
||||||
|
|
||||||
|
if not file_path:
|
||||||
|
logging.critical("_get_album_art:: No current file")
|
||||||
|
return
|
||||||
|
original_file_path: Optional[str] = file_path
|
||||||
|
file_path: Optional[str] = file_path.replace("/paul/toons/",
|
||||||
|
"/singer/gogs_toons/")
|
||||||
|
cached_album_art: Optional[bytes|bool] = await self.get_album_art(file_path=original_file_path,
|
||||||
|
track_id=track_id)
|
||||||
|
if cached_album_art:
|
||||||
|
return cached_album_art
|
||||||
|
except:
|
||||||
|
traceback.print_exc()
|
||||||
|
|
||||||
|
def get_queue_item_by_uuid(self, uuid: str) -> Optional[tuple[int, dict]]:
|
||||||
|
"""
|
||||||
|
Get queue item by UUID
|
||||||
|
Args:
|
||||||
|
uuid: The UUID to search
|
||||||
|
Returns:
|
||||||
|
dict|None
|
||||||
|
"""
|
||||||
|
for x, item in enumerate(self.active_playlist):
|
||||||
|
if item.get('uuid') == uuid:
|
||||||
|
return (x, item)
|
||||||
|
return None
|
||||||
|
|
||||||
|
async def _ls_skip(self) -> bool:
|
||||||
|
try:
|
||||||
|
async with ClientSession() as session:
|
||||||
|
async with session.get(f"{self.ls_uri}/next",
|
||||||
|
timeout=ClientTimeout(connect=2, sock_read=2)) as request:
|
||||||
|
request.raise_for_status()
|
||||||
|
text: Optional[str] = await request.text()
|
||||||
|
return text == "OK"
|
||||||
|
except Exception as e:
|
||||||
|
logging.debug("Skip failed: %s", str(e))
|
||||||
|
|
||||||
|
return False # failsafe
|
||||||
|
|
||||||
|
async def get_ai_song_info(self, artist: str, song: str) -> Optional[str]:
|
||||||
"""
|
"""
|
||||||
Get AI Song Info
|
Get AI Song Info
|
||||||
Args:
|
Args:
|
||||||
@ -42,7 +229,7 @@ class RadioUtil:
|
|||||||
Returns:
|
Returns:
|
||||||
str|None
|
str|None
|
||||||
"""
|
"""
|
||||||
response = await self.gpt.get_completion(prompt=f"I am going to listen to {song} by {artist}.")
|
response: Optional[str] = await self.gpt.get_completion(prompt=f"I am going to listen to {song} by {artist}.")
|
||||||
if not response:
|
if not response:
|
||||||
logging.critical("No response received from GPT?")
|
logging.critical("No response received from GPT?")
|
||||||
return
|
return
|
||||||
@ -58,10 +245,11 @@ class RadioUtil:
|
|||||||
None
|
None
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
|
||||||
# First, send track info
|
# First, send track info
|
||||||
friendly_track_start = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(track['start']))
|
friendly_track_start: str = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(track['start']))
|
||||||
friendly_track_end = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(track['end']))
|
friendly_track_end: str = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(track['end']))
|
||||||
hook_data = {
|
hook_data: dict = {
|
||||||
'username': 'serious.FM',
|
'username': 'serious.FM',
|
||||||
"embeds": [{
|
"embeds": [{
|
||||||
"title": "Now Playing",
|
"title": "Now Playing",
|
||||||
@ -95,7 +283,7 @@ class RadioUtil:
|
|||||||
}]
|
}]
|
||||||
}
|
}
|
||||||
|
|
||||||
sfm_hook = self.webhooks['sfm'].get('hook')
|
sfm_hook: str = self.webhooks['sfm'].get('hook')
|
||||||
async with ClientSession() as session:
|
async with ClientSession() as session:
|
||||||
async with await session.post(sfm_hook, json=hook_data,
|
async with await session.post(sfm_hook, json=hook_data,
|
||||||
timeout=ClientTimeout(connect=5, sock_read=5), headers={
|
timeout=ClientTimeout(connect=5, sock_read=5), headers={
|
||||||
@ -104,9 +292,12 @@ class RadioUtil:
|
|||||||
|
|
||||||
# Next, AI feedback
|
# Next, AI feedback
|
||||||
|
|
||||||
ai_response = await self.get_ai_song_info(track['artist'],
|
ai_response: Optional[str] = await self.get_ai_song_info(track['artist'],
|
||||||
track['song'])
|
track['song'])
|
||||||
hook_data = {
|
if not ai_response:
|
||||||
|
return
|
||||||
|
|
||||||
|
hook_data: dict = {
|
||||||
'username': 'GPT',
|
'username': 'GPT',
|
||||||
"embeds": [{
|
"embeds": [{
|
||||||
"title": "AI Feedback",
|
"title": "AI Feedback",
|
||||||
@ -115,7 +306,7 @@ class RadioUtil:
|
|||||||
}]
|
}]
|
||||||
}
|
}
|
||||||
|
|
||||||
ai_hook = self.webhooks['gpt'].get('hook')
|
ai_hook: str = self.webhooks['gpt'].get('hook')
|
||||||
async with ClientSession() as session:
|
async with ClientSession() as session:
|
||||||
async with await session.post(ai_hook, json=hook_data,
|
async with await session.post(ai_hook, json=hook_data,
|
||||||
timeout=ClientTimeout(connect=5, sock_read=5), headers={
|
timeout=ClientTimeout(connect=5, sock_read=5), headers={
|
||||||
|
@ -23,7 +23,7 @@ class RandMsg(FastAPI):
|
|||||||
Get a randomly generated message
|
Get a randomly generated message
|
||||||
"""
|
"""
|
||||||
random.seed()
|
random.seed()
|
||||||
short = data.short if data else False
|
short: bool = data.short if data else False
|
||||||
if not short:
|
if not short:
|
||||||
db_rand_selected = random.choice([0, 1, 3])
|
db_rand_selected = random.choice([0, 1, 3])
|
||||||
else:
|
else:
|
||||||
|
@ -3,17 +3,18 @@
|
|||||||
import os
|
import os
|
||||||
import aiosqlite as sqlite3
|
import aiosqlite as sqlite3
|
||||||
from fastapi import FastAPI
|
from fastapi import FastAPI
|
||||||
|
from typing import Optional, LiteralString
|
||||||
from .constructors import ValidShowEpisodeLineRequest, ValidShowEpisodeListRequest
|
from .constructors import ValidShowEpisodeLineRequest, ValidShowEpisodeListRequest
|
||||||
|
|
||||||
class Transcriptions(FastAPI):
|
class Transcriptions(FastAPI):
|
||||||
"""Transcription Endpoints"""
|
"""Transcription Endpoints"""
|
||||||
def __init__(self, app: FastAPI, util, constants, glob_state): # pylint: disable=super-init-not-called
|
def __init__(self, app: FastAPI, util, constants, glob_state) -> None: # pylint: disable=super-init-not-called
|
||||||
self.app = app
|
self.app = app
|
||||||
self.util = util
|
self.util = util
|
||||||
self.constants = constants
|
self.constants = constants
|
||||||
self.glob_state = glob_state
|
self.glob_state = glob_state
|
||||||
|
|
||||||
self.endpoints = {
|
self.endpoints: dict = {
|
||||||
"transcriptions/get_episodes": self.get_episodes_handler,
|
"transcriptions/get_episodes": self.get_episodes_handler,
|
||||||
"transcriptions/get_episode_lines": self.get_episode_lines_handler,
|
"transcriptions/get_episode_lines": self.get_episode_lines_handler,
|
||||||
#tbd
|
#tbd
|
||||||
@ -23,15 +24,13 @@ class Transcriptions(FastAPI):
|
|||||||
app.add_api_route(f"/{endpoint}", handler, methods=["POST"],
|
app.add_api_route(f"/{endpoint}", handler, methods=["POST"],
|
||||||
include_in_schema=False)
|
include_in_schema=False)
|
||||||
|
|
||||||
async def get_episodes_handler(self, data: ValidShowEpisodeListRequest):
|
async def get_episodes_handler(self, data: ValidShowEpisodeListRequest) -> dict:
|
||||||
"""
|
"""Get list of episodes by show id"""
|
||||||
/transcriptions/get_episodes
|
|
||||||
Get list of episodes by show id
|
show_id: int = data.s
|
||||||
"""
|
db_path: Optional[str|LiteralString] = None
|
||||||
show_id = data.s
|
db_query: Optional[str] = None
|
||||||
db_path = None
|
show_title: Optional[str] = None
|
||||||
db_query = None
|
|
||||||
show_title = None
|
|
||||||
|
|
||||||
if show_id is None:
|
if show_id is None:
|
||||||
return {
|
return {
|
||||||
@ -39,7 +38,7 @@ class Transcriptions(FastAPI):
|
|||||||
'errorText': 'Invalid request'
|
'errorText': 'Invalid request'
|
||||||
}
|
}
|
||||||
|
|
||||||
show_id = int(show_id)
|
show_id: int = int(show_id)
|
||||||
|
|
||||||
if not(str(show_id).isnumeric()) or show_id not in [0, 1, 2]:
|
if not(str(show_id).isnumeric()) or show_id not in [0, 1, 2]:
|
||||||
return {
|
return {
|
||||||
@ -49,20 +48,20 @@ class Transcriptions(FastAPI):
|
|||||||
|
|
||||||
match show_id:
|
match show_id:
|
||||||
case 0:
|
case 0:
|
||||||
db_path = os.path.join("/", "usr", "local", "share",
|
db_path: str|LiteralString = os.path.join("/", "usr", "local", "share",
|
||||||
"sqlite_dbs", "sp.db")
|
"sqlite_dbs", "sp.db")
|
||||||
db_query = """SELECT DISTINCT(("S" || Season || "E" || Episode || " " || Title)), ID FROM SP_DAT ORDER BY Season, Episode"""
|
db_query: str = """SELECT DISTINCT(("S" || Season || "E" || Episode || " " || Title)), ID FROM SP_DAT ORDER BY Season, Episode"""
|
||||||
show_title = "South Park"
|
show_title: str = "South Park"
|
||||||
case 1:
|
case 1:
|
||||||
db_path = os.path.join("/", "usr", "local", "share",
|
db_path: str|LiteralString = os.path.join("/", "usr", "local", "share",
|
||||||
"sqlite_dbs", "futur.db")
|
"sqlite_dbs", "futur.db")
|
||||||
db_query = """SELECT DISTINCT(("S" || EP_S || "E" || EP_EP || " " || EP_TITLE)), EP_ID FROM clean_dialog ORDER BY EP_S, EP_EP"""
|
db_query: str = """SELECT DISTINCT(("S" || EP_S || "E" || EP_EP || " " || EP_TITLE)), EP_ID FROM clean_dialog ORDER BY EP_S, EP_EP"""
|
||||||
show_title = "Futurama"
|
show_title: str = "Futurama"
|
||||||
case 2:
|
case 2:
|
||||||
db_path = os.path.join("/", "usr", "local", "share",
|
db_path: str|LiteralString = os.path.join("/", "usr", "local", "share",
|
||||||
"sqlite_dbs", "parks.db")
|
"sqlite_dbs", "parks.db")
|
||||||
db_query = """SELECT DISTINCT(("S" || EP_S || "E" || EP_EP || " " || EP_TITLE)), EP_ID FROM clean_dialog ORDER BY EP_S, EP_EP"""
|
db_query: str = """SELECT DISTINCT(("S" || EP_S || "E" || EP_EP || " " || EP_TITLE)), EP_ID FROM clean_dialog ORDER BY EP_S, EP_EP"""
|
||||||
show_title = "Parks And Rec"
|
show_title: str = "Parks And Rec"
|
||||||
case _:
|
case _:
|
||||||
return {
|
return {
|
||||||
'err': True,
|
'err': True,
|
||||||
@ -71,7 +70,7 @@ class Transcriptions(FastAPI):
|
|||||||
await self.glob_state.increment_counter('transcript_list_requests')
|
await self.glob_state.increment_counter('transcript_list_requests')
|
||||||
async with sqlite3.connect(database=db_path, timeout=1) as _db:
|
async with sqlite3.connect(database=db_path, timeout=1) as _db:
|
||||||
async with await _db.execute(db_query) as _cursor:
|
async with await _db.execute(db_query) as _cursor:
|
||||||
result = await _cursor.fetchall()
|
result: list[tuple] = await _cursor.fetchall()
|
||||||
return {
|
return {
|
||||||
"show_title": show_title,
|
"show_title": show_title,
|
||||||
"episodes": [
|
"episodes": [
|
||||||
@ -81,26 +80,24 @@ class Transcriptions(FastAPI):
|
|||||||
} for item in result]
|
} for item in result]
|
||||||
}
|
}
|
||||||
|
|
||||||
async def get_episode_lines_handler(self, data: ValidShowEpisodeLineRequest):
|
async def get_episode_lines_handler(self, data: ValidShowEpisodeLineRequest) -> dict:
|
||||||
"""/transcriptions/get_episode_lines
|
"""Get lines for a particular episode"""
|
||||||
Get lines for a particular episode
|
show_id: int = data.s
|
||||||
"""
|
episode_id: int = data.e
|
||||||
show_id = data.s
|
|
||||||
episode_id = data.e
|
|
||||||
# pylint: disable=line-too-long
|
# pylint: disable=line-too-long
|
||||||
match show_id:
|
match show_id:
|
||||||
case 0:
|
case 0:
|
||||||
db_path = os.path.join("/", "usr", "local", "share",
|
db_path: str|LiteralString = os.path.join("/", "usr", "local", "share",
|
||||||
"sqlite_dbs", "sp.db")
|
"sqlite_dbs", "sp.db")
|
||||||
db_query = """SELECT ("S" || Season || "E" || Episode || " " || Title), Character, Line FROM SP_DAT WHERE ID = ?"""
|
db_query: str = """SELECT ("S" || Season || "E" || Episode || " " || Title), Character, Line FROM SP_DAT WHERE ID = ?"""
|
||||||
case 1:
|
case 1:
|
||||||
db_path = os.path.join("/", "usr", "local", "share",
|
db_path: str|LiteralString = os.path.join("/", "usr", "local", "share",
|
||||||
"sqlite_dbs", "futur.db")
|
"sqlite_dbs", "futur.db")
|
||||||
db_query = """SELECT ("S" || EP_S || "E" || EP_EP || " " || EP_TITLE || "<br><em>Opener: " || EP_OPENER || "</em>"), EP_LINE_SPEAKER, EP_LINE FROM clean_dialog WHERE EP_ID = ? ORDER BY LINE_ID ASC"""
|
db_query: str = """SELECT ("S" || EP_S || "E" || EP_EP || " " || EP_TITLE || "<br><em>Opener: " || EP_OPENER || "</em>"), EP_LINE_SPEAKER, EP_LINE FROM clean_dialog WHERE EP_ID = ? ORDER BY LINE_ID ASC"""
|
||||||
case 2:
|
case 2:
|
||||||
db_path = os.path.join("/", "usr", "local", "share",
|
db_path: str|LiteralString = os.path.join("/", "usr", "local", "share",
|
||||||
"sqlite_dbs", "parks.db")
|
"sqlite_dbs", "parks.db")
|
||||||
db_query = """SELECT ("S" || EP_S || "E" || EP_EP || " " || EP_TITLE), EP_LINE_SPEAKER, EP_LINE FROM clean_dialog WHERE EP_ID = ? ORDER BY id ASC"""
|
db_query: str = """SELECT ("S" || EP_S || "E" || EP_EP || " " || EP_TITLE), EP_LINE_SPEAKER, EP_LINE FROM clean_dialog WHERE EP_ID = ? ORDER BY id ASC"""
|
||||||
|
|
||||||
case _:
|
case _:
|
||||||
return {
|
return {
|
||||||
@ -110,10 +107,10 @@ class Transcriptions(FastAPI):
|
|||||||
|
|
||||||
await self.glob_state.increment_counter('transcript_requests')
|
await self.glob_state.increment_counter('transcript_requests')
|
||||||
async with sqlite3.connect(database=db_path, timeout=1) as _db:
|
async with sqlite3.connect(database=db_path, timeout=1) as _db:
|
||||||
params = (episode_id,)
|
params: tuple = (episode_id,)
|
||||||
async with await _db.execute(db_query, params) as _cursor:
|
async with await _db.execute(db_query, params) as _cursor:
|
||||||
result = await _cursor.fetchall()
|
result: list[tuple] = await _cursor.fetchall()
|
||||||
first_result = result[0]
|
first_result: tuple = result[0]
|
||||||
return {
|
return {
|
||||||
'episode_id': episode_id,
|
'episode_id': episode_id,
|
||||||
'ep_friendly': first_result[0].strip(),
|
'ep_friendly': first_result[0].strip(),
|
||||||
|
@ -1,21 +1,22 @@
|
|||||||
#!/usr/bin/env python3.12
|
#!/usr/bin/env python3.12
|
||||||
# pylint: disable=invalid-name
|
|
||||||
|
|
||||||
|
import logging
|
||||||
from fastapi import FastAPI, Request, HTTPException
|
from fastapi import FastAPI, Request, HTTPException
|
||||||
from pydantic import BaseModel
|
from pydantic import BaseModel
|
||||||
from aiohttp import ClientSession, ClientTimeout
|
from aiohttp import ClientSession, ClientTimeout
|
||||||
from .constructors import ValidXCRequest
|
from .constructors import ValidXCRequest
|
||||||
|
# pylint: disable=invalid-name
|
||||||
|
|
||||||
class XC(FastAPI):
|
class XC(FastAPI):
|
||||||
"""XC (CrossComm) Endpoints"""
|
"""XC (CrossComm) Endpoints"""
|
||||||
def __init__(self, app: FastAPI, util, constants, glob_state): # pylint: disable=super-init-not-called
|
def __init__(self, app: FastAPI, util, constants, glob_state) -> None: # pylint: disable=super-init-not-called
|
||||||
self.app = app
|
self.app = app
|
||||||
self.util = util
|
self.util = util
|
||||||
self.constants = constants
|
self.constants = constants
|
||||||
self.glob_state = glob_state
|
self.glob_state = glob_state
|
||||||
|
|
||||||
|
|
||||||
self.endpoints = {
|
self.endpoints: dict = {
|
||||||
"xc": self.xc_handler,
|
"xc": self.xc_handler,
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -23,25 +24,18 @@ class XC(FastAPI):
|
|||||||
app.add_api_route(f"/{endpoint}", handler, methods=["POST"],
|
app.add_api_route(f"/{endpoint}", handler, methods=["POST"],
|
||||||
include_in_schema=False)
|
include_in_schema=False)
|
||||||
|
|
||||||
# async def put_ws_handler(self, ws: WebSocket):
|
async def xc_handler(self, data: ValidXCRequest, request: Request) -> dict:
|
||||||
# await ws.accept()
|
"""Handle XC Commands"""
|
||||||
# await self.audio_streamer.handle_client(ws)
|
|
||||||
|
|
||||||
async def xc_handler(self, data: ValidXCRequest, request: Request):
|
|
||||||
"""
|
|
||||||
/xc
|
|
||||||
Handle XC Commands
|
|
||||||
"""
|
|
||||||
|
|
||||||
try:
|
try:
|
||||||
key = data.key
|
key: str = data.key
|
||||||
bid = data.bid
|
bid: int = data.bid
|
||||||
cmd = data.cmd
|
cmd: str = data.cmd
|
||||||
cmd_data = data.data
|
cmd_data: dict = data.data
|
||||||
if not self.util.check_key(path=request.url.path, req_type=0, key=key):
|
if not self.util.check_key(path=request.url.path, req_type=0, key=key):
|
||||||
raise HTTPException(status_code=403, detail="Unauthorized")
|
raise HTTPException(status_code=403, detail="Unauthorized")
|
||||||
|
|
||||||
BID_ADDR_MAP = {
|
BID_ADDR_MAP: dict = {
|
||||||
0: '10.10.10.101:5991', # Patrick (a.k.a. Thomas a.k.a. Aces)
|
0: '10.10.10.101:5991', # Patrick (a.k.a. Thomas a.k.a. Aces)
|
||||||
# TODO: add Havoc?
|
# TODO: add Havoc?
|
||||||
}
|
}
|
||||||
@ -52,15 +46,15 @@ class XC(FastAPI):
|
|||||||
'errorText': 'Invalid bot id'
|
'errorText': 'Invalid bot id'
|
||||||
}
|
}
|
||||||
|
|
||||||
bot_api_url = f'http://{BID_ADDR_MAP[bid]}/'
|
bot_api_url: str = f'http://{BID_ADDR_MAP[bid]}/'
|
||||||
async with ClientSession() as session:
|
async with ClientSession() as session:
|
||||||
async with await session.post(f"{bot_api_url}{cmd}", json=cmd_data, headers={
|
async with await session.post(f"{bot_api_url}{cmd}", json=cmd_data, headers={
|
||||||
'Content-Type': 'application/json; charset=utf-8'
|
'Content-Type': 'application/json; charset=utf-8'
|
||||||
}, timeout=ClientTimeout(connect=5, sock_read=5)) as request:
|
}, timeout=ClientTimeout(connect=5, sock_read=5)) as request:
|
||||||
response = await request.json()
|
response: dict = await request.json()
|
||||||
return {
|
return {
|
||||||
'success': True,
|
'success': True,
|
||||||
'response': response
|
'response': response
|
||||||
}
|
}
|
||||||
except:
|
except Exception as e:
|
||||||
pass
|
logging.debug("Error: %s", str(e))
|
@ -3,18 +3,19 @@
|
|||||||
import importlib
|
import importlib
|
||||||
from fastapi import FastAPI
|
from fastapi import FastAPI
|
||||||
from pydantic import BaseModel
|
from pydantic import BaseModel
|
||||||
|
from typing import Optional
|
||||||
from .constructors import ValidYTSearchRequest
|
from .constructors import ValidYTSearchRequest
|
||||||
|
|
||||||
class YT(FastAPI):
|
class YT(FastAPI):
|
||||||
"""YT Endpoints"""
|
"""YT Endpoints"""
|
||||||
def __init__(self, app: FastAPI, util, constants, glob_state): # pylint: disable=super-init-not-called
|
def __init__(self, app: FastAPI, util, constants, glob_state) -> None: # pylint: disable=super-init-not-called
|
||||||
self.app = app
|
self.app = app
|
||||||
self.util = util
|
self.util = util
|
||||||
self.constants = constants
|
self.constants = constants
|
||||||
self.glob_state = glob_state
|
self.glob_state = glob_state
|
||||||
self.ytsearch = importlib.import_module("youtube_search_async").YoutubeSearch()
|
self.ytsearch = importlib.import_module("youtube_search_async").YoutubeSearch()
|
||||||
|
|
||||||
self.endpoints = {
|
self.endpoints: dict = {
|
||||||
"yt/search": self.yt_video_search_handler,
|
"yt/search": self.yt_video_search_handler,
|
||||||
#tbd
|
#tbd
|
||||||
}
|
}
|
||||||
@ -23,15 +24,20 @@ class YT(FastAPI):
|
|||||||
app.add_api_route(f"/{endpoint}", handler, methods=["POST"],
|
app.add_api_route(f"/{endpoint}", handler, methods=["POST"],
|
||||||
include_in_schema=True)
|
include_in_schema=True)
|
||||||
|
|
||||||
async def yt_video_search_handler(self, data: ValidYTSearchRequest):
|
async def yt_video_search_handler(self, data: ValidYTSearchRequest) -> dict:
|
||||||
"""
|
"""
|
||||||
Search for YT Video by Title (closest match returned)
|
Search for YT Video by Title (closest match returned)
|
||||||
- **t**: Title to search
|
- **t**: Title to search
|
||||||
"""
|
"""
|
||||||
|
|
||||||
title = data.t
|
title: str = data.t
|
||||||
yts_res = await self.ytsearch.search(title)
|
yts_res: Optional[list[dict]] = await self.ytsearch.search(title)
|
||||||
yt_video_id = yts_res[0].get('id', False)
|
if not yts_res:
|
||||||
|
return {
|
||||||
|
'err': True,
|
||||||
|
'errorText': 'No result.',
|
||||||
|
}
|
||||||
|
yt_video_id: str|bool = yts_res[0].get('id', False)
|
||||||
|
|
||||||
return {
|
return {
|
||||||
'video_id': yt_video_id,
|
'video_id': yt_video_id,
|
||||||
|
@ -47,7 +47,7 @@ class LastFM:
|
|||||||
'err': 'Failed'
|
'err': 'Failed'
|
||||||
}
|
}
|
||||||
|
|
||||||
async def get_track_info(self, artist=None, track=None):
|
async def get_track_info(self, artist=None, track=None) -> dict:
|
||||||
"""Get Track Info from LastFM"""
|
"""Get Track Info from LastFM"""
|
||||||
try:
|
try:
|
||||||
if artist is None or track is None:
|
if artist is None or track is None:
|
||||||
@ -74,7 +74,7 @@ class LastFM:
|
|||||||
'err': 'General Failure'
|
'err': 'General Failure'
|
||||||
}
|
}
|
||||||
|
|
||||||
async def get_album_tracklist(self, artist=None, album=None):
|
async def get_album_tracklist(self, artist=None, album=None) -> dict:
|
||||||
"""Get Album Tracklist"""
|
"""Get Album Tracklist"""
|
||||||
try:
|
try:
|
||||||
if artist is None or album is None:
|
if artist is None or album is None:
|
||||||
@ -98,7 +98,7 @@ class LastFM:
|
|||||||
'err': 'General Failure'
|
'err': 'General Failure'
|
||||||
}
|
}
|
||||||
|
|
||||||
async def get_artist_albums(self, artist=None):
|
async def get_artist_albums(self, artist=None) -> dict|list[dict]:
|
||||||
"""Get Artists Albums from LastFM"""
|
"""Get Artists Albums from LastFM"""
|
||||||
try:
|
try:
|
||||||
if artist is None:
|
if artist is None:
|
||||||
|
Loading…
x
Reference in New Issue
Block a user