Compare commits

..

61 Commits

Author SHA1 Message Date
3b74333b96 misc 2025-09-12 22:39:59 -04:00
f6d4ed57f3 misc 2025-09-09 15:50:13 -04:00
a57173b90a TRip: change file naming, use pigz for faster tarball creation 2025-08-29 10:23:06 -04:00
a11748775e TRip: capitalize RQ job statuses in related endpoints, order job list, other: minor/typing 2025-08-23 08:20:32 -04:00
a8d089c0fe minor 2025-08-21 15:35:10 -04:00
dd8d07b2f0 formatting 2025-08-21 15:08:13 -04:00
22eaa2260e another commit without a list of specific changes! (misc) 2025-08-21 15:06:56 -04:00
e0f64f6773 misc 2025-08-20 15:58:07 -04:00
81f79dea1e misc 2025-08-20 07:32:57 -04:00
3cebe14674 misc / TRip: folder structure / tar naming 2025-08-15 14:58:06 -04:00
27fa1f78ed misc 2025-08-15 14:15:13 -04:00
0cd4a71db2 formatting / RQ tuning 2025-08-15 13:39:27 -04:00
93050ec6cf misc / RQ bulk downloads for TRip 2025-08-15 13:31:15 -04:00
72a7734152 minor 2025-08-11 15:06:58 -04:00
4cbd0fb934 docstrings 2025-08-11 14:06:42 -04:00
0fc78b08e4 oops 2025-08-11 14:05:20 -04:00
f1401ee6bf oops 2025-08-11 14:04:22 -04:00
e5dc72ea1b change TRip to use StreamRip rather than Hifi-Tui due to bugs 2025-08-11 14:03:43 -04:00
957e2f1f08 Add CORS allowed origin: https://status.boatson.boats 2025-08-09 07:54:19 -04:00
b7433239a5 rm copypasta comment 2025-08-09 07:50:54 -04:00
fb1d48ab58 formatting / CORS changes 2025-08-09 07:48:07 -04:00
9e9748076b misc / tRIP - beginnings/work in progress 2025-08-07 11:47:57 -04:00
8603b11438 playlists have been stored to redis for faster retrieval; additional work needed (playlist management, typeahead, etc- to move away from SQLite) 2025-07-20 15:50:25 -04:00
c42ebbfe53 same/prior; method call missing data.station, added 2025-07-20 08:03:31 -04:00
3fc4dd4072 bugfix: forgot to add data.station to datatables_search call, only the main station's queue was searched 2025-07-19 22:21:09 -04:00
9ce16ba923 rewrite pending; for now, additional support for multi-station 2025-07-19 21:57:21 -04:00
85182b7d8c WIP: additional radio stations 2025-07-17 06:55:16 -04:00
fd300743c8 minor 2025-07-15 13:48:51 -04:00
a1f82036ff misc 2025-07-15 11:39:12 -04:00
c75abdfab2 rm comments 2025-07-01 13:03:53 -04:00
1d7589ffbd misc/formatting ++ resolve #33 (restructured radio DBs, combined genre into track_file_map, revised query w/ INNER JOIN) 2025-07-01 13:02:53 -04:00
c3f753a4f0 add basic rate limiting 2025-07-01 11:38:38 -04:00
1991e5b31b small improvements re: #33 2025-07-01 10:34:03 -04:00
0fe081597e base: add allowed CORS origins for localhost:4321 (dev purposes); lyric_search: change typeahead to return a maximum of 10 results, prev. 100 2025-06-22 07:54:32 -04:00
a6128c7647 constructor reversion - related commit @8f3d4bf181 2025-06-09 07:52:13 -04:00
8f3d4bf181 revert radio queue behavior 2025-06-09 07:15:57 -04:00
4cdd6d0c99 misc/migration related 2025-06-08 08:53:18 -04:00
68408c4796 share 2025-05-27 16:48:28 -04:00
e07a9dd7d2 minor 2025-05-21 07:28:42 -04:00
0d58ae2a96 meme/misc/rm karma 2025-05-20 11:14:08 -04:00
5c351a6e0f memes - change method used to check if images are already stored as PNG 2025-05-17 08:48:44 -04:00
2caa482a0d formatting/add meme endpoints 2025-05-17 08:07:38 -04:00
d944a32c62 add retries for genius/lrclib searches (tenacity) + reduce timeouts appropriately 2025-05-07 06:46:27 -04:00
f8cb2a4bea genius - increase timeouts, radio_util- webhook bugfix 2025-05-04 08:21:35 -04:00
afb680404c json.dumps not needed 2025-05-03 06:39:55 -04:00
3476dc9e64 radio: move db query to constants (gitignored), bugfix: naas 2025-05-03 06:31:19 -04:00
457d72c934 reformat 2025-05-01 15:55:43 -04:00
3d6f1006a9 reformat/naas 2025-05-01 15:54:27 -04:00
ad43db289a remove karma endpoints from schema + add webhook notification for debugging (failed lyrics searches, non-LRC) 2025-05-01 06:32:28 -04:00
8848d3a493 bugfix: datatables search for radio queue was returning incorrect queue positions for items once a filter/query was provided (numbering/index was based on the filtered resultset) 2025-04-27 08:27:08 -04:00
2a49a92bb2 misc 2025-04-26 22:01:25 -04:00
0b70d93d47 misc 2025-04-26 21:27:55 -04:00
6a43d32808 bugfix 2025-04-26 19:59:38 -04:00
58ba471b5e misc 2025-04-26 19:47:12 -04:00
6c29c6fede reformat / resolves #32 2025-04-26 17:17:42 -04:00
4c5d2b6943 swap threading for multiprocessing's ThreadPool (radio playlist load), aiosqlite -> sqlite3 standard lib as disk i/o is blocking regardless; changes related to #32 for radio queue pagination, more work needed 2025-04-26 12:01:45 -04:00
6502199b5d reformat 2025-04-22 16:24:58 -04:00
b9bdf31944 load playlist in separate thread to prevent blocking on startup 2025-04-22 16:24:00 -04:00
5d2de1471f small optimization for filtering 2025-04-22 15:49:32 -04:00
f18a9da4a0 misc/basic genre filtering 2025-04-22 15:31:26 -04:00
40fa51af36 created add_genres to allow batch add of artist/genre pairs, alongside add_genre for singletons 2025-04-22 15:04:46 -04:00
27 changed files with 2264 additions and 697 deletions

15
.gitignore vendored
View File

@@ -8,10 +8,25 @@ constants.py
tests.py
db_migrate.py
notifier.py
test_hifi.py
youtube*
playlist_creator.py
artist_genre_tag.py
pg_migrate_lyrics.py
uv.lock
pyproject.toml
mypy.ini
.python-version
get_next_track.py
endpoints/radio.py
utils/radio_util.py
redis_playlist.py
endpoints/auth.py
endpoints/radio2
endpoints/radio2/**
hash_password.py
up.py
job_review.py
check_missing.py
**/auth/*
.gitignore

38
base.py
View File

@@ -9,8 +9,10 @@ from fastapi import FastAPI, Request
from fastapi.middleware.cors import CORSMiddleware
from lyric_search.sources import redis_cache
logging.basicConfig(level=logging.INFO)
logging.getLogger("aiosqlite").setLevel(logging.WARNING)
logging.getLogger("httpx").setLevel(logging.WARNING)
logger = logging.getLogger()
logger.setLevel(logging.INFO)
loop = asyncio.get_event_loop()
app = FastAPI(
@@ -21,20 +23,27 @@ app = FastAPI(
loop=loop,
)
constants = importlib.import_module("constants").Constants()
util = importlib.import_module("util").Utilities(app, constants)
origins = ["https://codey.lol", "https://api.codey.lol"]
origins = [
"https://codey.lol",
"https://old.codey.lol",
"https://api.codey.lol",
"https://status.boatson.boats",
"https://_new.codey.lol",
"http://localhost:4321",
]
app.add_middleware(
CORSMiddleware, # type: ignore
allow_origins=origins,
allow_credentials=True,
allow_methods=["POST", "GET", "HEAD"],
allow_methods=["POST", "GET", "HEAD", "OPTIONS"],
allow_headers=["*"],
) # type: ignore
"""
Blacklisted routes
"""
@@ -53,7 +62,10 @@ def base_head():
@app.get("/{path}", include_in_schema=False)
def disallow_get_any(request: Request, var: Any = None):
path = request.path_params["path"]
if not (isinstance(path, str) and path.split("/", maxsplit=1) == "widget"):
if not (
isinstance(path, str)
and (path.split("/", maxsplit=1) == "widget" or path == "misc/no")
):
return util.get_blocked_response()
else:
logging.info("OK, %s", path)
@@ -84,9 +96,12 @@ routes: dict = {
),
"lastfm": importlib.import_module("endpoints.lastfm").LastFM(app, util, constants),
"yt": importlib.import_module("endpoints.yt").YT(app, util, constants),
"karma": importlib.import_module("endpoints.karma").Karma(app, util, constants),
"radio": importlib.import_module("endpoints.radio").Radio(app, util, constants),
"mgr": importlib.import_module("endpoints.mgr.mgr_test").Mgr(app, util, constants),
"radio": importlib.import_module("endpoints.radio").Radio(
app, util, constants, loop
),
"meme": importlib.import_module("endpoints.meme").Meme(app, util, constants),
"trip": importlib.import_module("endpoints.rip").RIP(app, util, constants),
"auth": importlib.import_module("endpoints.auth").Auth(app),
}
# Misc endpoint depends on radio endpoint instance
@@ -105,5 +120,12 @@ End Actionable Routes
Startup
"""
async def on_start():
uvicorn_access_logger = logging.getLogger("uvicorn.access")
uvicorn_access_logger.disabled = True
app.add_event_handler("startup", on_start)
redis = redis_cache.RedisCache()
loop.create_task(redis.create_index())

View File

@@ -1,39 +1,8 @@
from typing import Optional
from typing import Literal
from pydantic import BaseModel
"""
Karma
"""
class ValidKarmaUpdateRequest(BaseModel):
"""
Requires authentication
- **granter**: who updated the karma
- **keyword**: keyword to update karma for
- **flag**: either 0 (decrement) for --, or 1 (increment) for ++
"""
granter: str
keyword: str
flag: int
class ValidKarmaRetrievalRequest(BaseModel):
"""
- **keyword**: keyword to retrieve karma value of
"""
keyword: str
class ValidTopKarmaRequest(BaseModel):
"""
- **n**: Number of top results to return (default: 10)
"""
n: Optional[int] = 10
Station = Literal["main", "rock", "rap", "electronic", "pop"]
"""
LastFM
@@ -130,25 +99,6 @@ class ValidYTSearchRequest(BaseModel):
t: str = "rick astley - never gonna give you up"
"""
XC
"""
class ValidXCRequest(BaseModel):
"""
- **key**: valid XC API key
- **bid**: bot id
- **cmd**: bot command
- **data**: command data
"""
key: str
bid: int
cmd: str
data: Optional[dict]
"""
Transcriptions
"""
@@ -245,6 +195,7 @@ class ValidRadioSongRequest(BaseModel):
song: Optional[str] = None
artistsong: Optional[str] = None
alsoSkip: Optional[bool] = False
station: Station = "main"
class ValidRadioTypeaheadRequest(BaseModel):
@@ -269,35 +220,56 @@ class ValidRadioNextRequest(BaseModel):
"""
- **key**: API Key
- **skipTo**: UUID to skip to [optional]
- **station**: Station (default: "main")
"""
key: str
skipTo: Optional[str] = None
station: Station = "main"
class ValidRadioReshuffleRequest(ValidRadioNextRequest):
"""
- **key**: API Key
- **station**: Station (default: "main")
"""
class ValidRadioQueueRequest(BaseModel):
"""
- **draw**: DataTables draw count, default 1
- **start**: paging start position, default 0
- **search**: Optional search query
- **station**: Station (default: "main")
"""
draw: Optional[int] = 1
start: Optional[int] = 0
search: Optional[str] = None
station: Station = "main"
class ValidRadioQueueShiftRequest(BaseModel):
"""
- **key**: API Key
- **uuid**: UUID to shift
- **next**: Play next if true, immediately if false, default False
- **station**: Station (default: "main")
"""
key: str
uuid: str
next: Optional[bool] = False
station: Station = "main"
class ValidRadioQueueRemovalRequest(BaseModel):
"""
- **key**: API Key
- **uuid**: UUID to remove
- **station**: Station (default: "main")
"""
key: str
uuid: str
station: Station = "main"

View File

@@ -1,257 +0,0 @@
import os
import logging
import time
import datetime
import traceback
import aiosqlite as sqlite3
from typing import LiteralString, Optional, Union
from fastapi import FastAPI, Request, HTTPException
from fastapi.responses import JSONResponse
from .constructors import (
ValidTopKarmaRequest,
ValidKarmaRetrievalRequest,
ValidKarmaUpdateRequest,
)
class KarmaDB:
"""Karma DB Util"""
def __init__(self) -> None:
self.db_path: LiteralString = os.path.join(
"/", "usr", "local", "share", "sqlite_dbs", "karma.db"
)
async def get_karma(self, keyword: str) -> Union[int, dict]:
"""Get Karma Value for Keyword
Args:
keyword (str): The keyword to search
Returns:
Union[int, dict]
"""
async with sqlite3.connect(self.db_path, timeout=2) as db_conn:
async with await db_conn.execute(
"SELECT score FROM karma WHERE keyword LIKE ? LIMIT 1", (keyword,)
) as db_cursor:
try:
(score,) = await db_cursor.fetchone()
return score
except TypeError:
return {
"err": True,
"errorText": f"No records for {keyword}",
}
async def get_top(self, n: Optional[int] = 10) -> Optional[list[tuple]]:
"""
Get Top n=10 Karma Entries
Args:
n (Optional[int]) = 10: The number of top results to return
Returns:
list[tuple]
"""
try:
async with sqlite3.connect(self.db_path, timeout=2) as db_conn:
async with await db_conn.execute(
"SELECT keyword, score FROM karma ORDER BY score DESC LIMIT ?", (n,)
) as db_cursor:
return await db_cursor.fetchall()
except:
traceback.print_exc()
return None
async def update_karma(
self, granter: str, keyword: str, flag: int
) -> Optional[bool]:
"""
Update Karma for Keyword
Args:
granter (str): The user who granted (increased/decreased) the karma
keyword (str): The keyword to update
flag (int): 0 to increase karma, 1 to decrease karma
Returns:
Optional[bool]
"""
if not flag in [0, 1]:
return None
modifier: str = "score + 1" if not flag else "score - 1"
query: str = (
f"UPDATE karma SET score = {modifier}, last_change = ? WHERE keyword LIKE ?"
)
new_keyword_query: str = (
"INSERT INTO karma(keyword, score, last_change) VALUES(?, ?, ?)"
)
friendly_flag: str = "++" if not flag else "--"
audit_message: str = (
f"{granter} adjusted karma for {keyword} @ {datetime.datetime.now().isoformat()}: {friendly_flag}"
)
audit_query: str = (
"INSERT INTO karma_audit(impacted_keyword, comment) VALUES(?, ?)"
)
now: int = int(time.time())
logging.debug("Audit message: %s{audit_message}\nKeyword: %s{keyword}")
async with sqlite3.connect(self.db_path, timeout=2) as db_conn:
async with await db_conn.execute(
audit_query,
(
keyword,
audit_message,
),
) as db_cursor:
await db_conn.commit()
async with await db_conn.execute(
query,
(
now,
keyword,
),
) as db_cursor:
if db_cursor.rowcount:
await db_conn.commit()
return True
if db_cursor.rowcount < 1: # Keyword does not already exist
await db_cursor.close()
new_val = 1 if not flag else -1
async with await db_conn.execute(
new_keyword_query,
(
keyword,
new_val,
now,
),
) as db_cursor:
if db_cursor.rowcount >= 1:
await db_conn.commit()
return True
else:
return False
return False
class Karma(FastAPI):
"""
Karma Endpoints
"""
def __init__(self, app: FastAPI, util, constants) -> None:
self.app: FastAPI = app
self.util = util
self.constants = constants
self.db = KarmaDB()
self.endpoints: dict = {
"karma/get": self.get_karma_handler,
"karma/modify": self.modify_karma_handler,
"karma/top": self.top_karma_handler,
}
for endpoint, handler in self.endpoints.items():
app.add_api_route(
f"/{endpoint}", handler, methods=["POST"], include_in_schema=True
)
async def top_karma_handler(
self, request: Request, data: Optional[ValidTopKarmaRequest] = None
) -> JSONResponse:
"""
Get top keywords for karma
- **n**: Number of top results to return (default: 10)
"""
if not self.util.check_key(
request.url.path, request.headers.get("X-Authd-With")
):
raise HTTPException(status_code=403, detail="Unauthorized")
n: int = 10
if data and data.n:
n = int(data.n)
try:
top10: Optional[list[tuple]] = await self.db.get_top(n=n)
if not top10:
return JSONResponse(
status_code=500,
content={
"err": True,
"errorText": "General failure",
},
)
return JSONResponse(content=top10)
except:
traceback.print_exc()
return JSONResponse(
status_code=500,
content={
"err": True,
"errorText": "Exception occurred.",
},
)
async def get_karma_handler(
self, data: ValidKarmaRetrievalRequest, request: Request
) -> JSONResponse:
"""
Get current karma value
- **keyword**: Keyword to retrieve karma value for
"""
if not self.util.check_key(
request.url.path, request.headers.get("X-Authd-With")
):
raise HTTPException(status_code=403, detail="Unauthorized")
keyword: str = data.keyword
try:
count: Union[int, dict] = await self.db.get_karma(keyword)
return JSONResponse(
content={
"keyword": keyword,
"count": count,
}
)
except:
traceback.print_exc()
return JSONResponse(
status_code=500,
content={
"err": True,
"errorText": "Exception occurred.",
},
)
async def modify_karma_handler(
self, data: ValidKarmaUpdateRequest, request: Request
) -> JSONResponse:
"""
Update karma count
- **granter**: User who granted the karma
- **keyword**: The keyword to modify
- **flag**: 0 to decrement (--), 1 to increment (++)
"""
if not self.util.check_key(
request.url.path, request.headers.get("X-Authd-With"), 2
):
raise HTTPException(status_code=403, detail="Unauthorized")
if not data.flag in [0, 1]:
return JSONResponse(
status_code=500,
content={
"err": True,
"errorText": "Invalid request",
},
)
return JSONResponse(
content={
"success": await self.db.update_karma(
data.granter, data.keyword, data.flag
)
}
)

View File

@@ -1,7 +1,9 @@
import importlib
import logging
import traceback
from typing import Optional, Union
from fastapi import FastAPI
from fastapi import FastAPI, Depends
from fastapi_throttle import RateLimiter
from fastapi.responses import JSONResponse
from .constructors import (
ValidArtistSearchRequest,
@@ -31,7 +33,11 @@ class LastFM(FastAPI):
for endpoint, handler in self.endpoints.items():
app.add_api_route(
f"/{endpoint}", handler, methods=["POST"], include_in_schema=True
f"/{endpoint}",
handler,
methods=["POST"],
include_in_schema=True,
dependencies=[Depends(RateLimiter(times=2, seconds=2))],
)
async def artist_by_name_handler(
@@ -202,7 +208,7 @@ class LastFM(FastAPI):
)
if not track_info_result:
return JSONResponse(
status_code=500,
status_code=200,
content={
"err": True,
"errorText": "Not found.",
@@ -214,7 +220,8 @@ class LastFM(FastAPI):
track_info_result.get("errorText", "??"),
)
return JSONResponse(content={"success": True, "result": track_info_result})
except:
except Exception as e:
logging.debug("Exception: %s", str(e))
traceback.print_exc()
return JSONResponse(
status_code=500,

View File

@@ -3,7 +3,8 @@ import os
import urllib.parse
import regex
import aiosqlite as sqlite3
from fastapi import FastAPI, HTTPException
from fastapi import FastAPI, HTTPException, Depends
from fastapi_throttle import RateLimiter
from fastapi.responses import JSONResponse
from typing import LiteralString, Optional, Union, Iterable
from regex import Pattern
@@ -31,7 +32,7 @@ class CacheUtils:
_db.row_factory = sqlite3.Row
db_query: str = """SELECT DISTINCT(LOWER(TRIM(artist) || " - " || TRIM(song))),\
(TRIM(artist) || " - " || TRIM(song)) as ret FROM lyrics WHERE\
ret LIKE ? LIMIT 100"""
ret LIKE ? LIMIT 10"""
db_params: tuple[str] = (f"%%%{query}%%%",)
async with _db.execute(db_query, db_params) as _cursor:
result: Iterable[sqlite3.Row] = await _cursor.fetchall()
@@ -70,12 +71,23 @@ class LyricSearch(FastAPI):
)
for endpoint, handler in self.endpoints.items():
rate_limit: tuple[int, int] = (2, 3) # Default; (Times, Seconds)
_schema_include = endpoint in ["lyric/search"]
if (
endpoint == "typeahead/lyrics"
): # More permissive rate limiting for typeahead
rate_limit = (20, 2)
(times, seconds) = rate_limit
app.add_api_route(
f"/{endpoint}",
handler,
methods=["POST"],
include_in_schema=_schema_include,
dependencies=[Depends(RateLimiter(times=times, seconds=seconds))]
if not endpoint == "typeahead/lyrics"
else None,
)
async def typeahead_handler(self, data: ValidTypeAheadRequest) -> JSONResponse:
@@ -115,7 +127,7 @@ class LyricSearch(FastAPI):
if data.src.upper() not in self.acceptable_request_sources:
await self.notifier.send(
f"ERROR @ {__file__.rsplit("/", maxsplit=1)[-1]}",
f"ERROR @ {__file__.rsplit('/', maxsplit=1)[-1]}",
f"Unknown request source: {data.src}",
)
return JSONResponse(
@@ -126,6 +138,14 @@ class LyricSearch(FastAPI):
},
)
if data.a == "N/A" and data.s == "N/A":
return JSONResponse(
status_code=200,
content={
"test": "success",
},
)
if not data.t:
search_artist: Optional[str] = data.a
search_song: Optional[str] = data.s
@@ -160,10 +180,14 @@ class LyricSearch(FastAPI):
)
if not result:
# if not data.lrc:
# await self.notifier.send(
# "DEBUG", f"Could not locate lyrics, request was:\n`{data}`"
# )
return JSONResponse(
content={
"err": True,
"errorText": "Sources exhausted, lyrics not located.",
"errorText": "Failed to locate lyrics on any available sources.",
}
)

57
endpoints/meme.py Normal file
View File

@@ -0,0 +1,57 @@
from fastapi import FastAPI, Request, Response, Depends
from fastapi_throttle import RateLimiter
from fastapi.responses import JSONResponse
from utils.meme_util import MemeUtil
class Meme(FastAPI):
"""
Misc Endpoints
"""
def __init__(self, app: FastAPI, my_util, constants) -> None:
self.app: FastAPI = app
self.util = my_util
self.meme_util = MemeUtil(constants)
self.constants = constants
self.endpoints: dict = {
"memes/get_meme/{id:path}": self.get_meme_by_id,
"memes/random": self.random_meme,
"memes/list_memes": self.list_memes,
}
for endpoint, handler in self.endpoints.items():
dependencies = None
if endpoint == "memes/list_memes":
dependencies = [
Depends(RateLimiter(times=10, seconds=2))
] # Do not rate limit image retrievals (cached)
app.add_api_route(
f"/{endpoint}",
handler,
methods=["GET"],
include_in_schema=False,
dependencies=dependencies,
)
async def get_meme_by_id(self, id: int, request: Request) -> Response:
"""Get meme (image) by id"""
meme_image = await self.meme_util.get_meme_by_id(id)
if not meme_image:
return Response(status_code=404, content="Not found")
return Response(content=meme_image, media_type="image/png")
async def random_meme(self, request: Request) -> Response:
"""Get random meme (image)"""
meme_image = await self.meme_util.get_random_meme()
if not meme_image:
return Response(status_code=404, content="Not found")
return Response(content=meme_image, media_type="image/png")
async def list_memes(self, page: int, request: Request) -> Response:
"""Get meme (image) by id"""
meme_list = await self.meme_util.list_memes(page)
page_count = await self.meme_util.get_page_count()
return JSONResponse(
content={"paging": {"current": page, "of": page_count}, "memes": meme_list}
)

View File

@@ -1,10 +1,20 @@
import logging
import time
import os
from typing import Optional, Annotated
from fastapi import FastAPI, Request, UploadFile, Response, HTTPException, Form
import json
import random
from typing import Any, Optional, Annotated
from fastapi import FastAPI, Request, UploadFile, Response, HTTPException, Form, Depends
from fastapi_throttle import RateLimiter
from fastapi.responses import JSONResponse
import redis.asyncio as redis
import redis
from rq import Queue
from rq.registry import (
StartedJobRegistry,
FinishedJobRegistry,
FailedJobRegistry,
DeferredJobRegistry,
)
from lyric_search.sources import private, cache as LyricsCache, redis_cache
@@ -19,26 +29,64 @@ class Misc(FastAPI):
self.constants = constants
self.lyr_cache = LyricsCache.Cache()
self.redis_cache = redis_cache.RedisCache()
self.redis_client = redis.Redis(password=private.REDIS_PW)
self.redis_client: Any = redis.Redis(password=private.REDIS_PW)
self.radio = radio
self.activity_image: Optional[bytes] = None
self.nos_json_path: str = os.path.join(
"/", "usr", "local", "share", "naas", "reasons.json"
)
self.nos: list[str] = []
self.last_5_nos: list[str] = []
self.endpoints: dict = {
"widget/redis": self.homepage_redis_widget,
"widget/sqlite": self.homepage_sqlite_widget,
"widget/lyrics": self.homepage_lyrics_widget,
"widget/radio": self.homepage_radio_widget,
"widget/rq": self.homepage_rq_widget,
"misc/get_activity_image": self.get_activity_image,
"misc/no": self.no,
}
for endpoint, handler in self.endpoints.items():
app.add_api_route(
f"/{endpoint}", handler, methods=["GET"], include_in_schema=True
f"/{endpoint}",
handler,
methods=["GET"],
include_in_schema=True,
dependencies=[Depends(RateLimiter(times=10, seconds=2))],
)
app.add_api_route(
"/misc/upload_activity_image", self.upload_activity_image, methods=["POST"]
"/misc/upload_activity_image",
self.upload_activity_image,
methods=["POST"],
dependencies=[Depends(RateLimiter(times=10, seconds=2))],
)
logging.debug("Loading NaaS reasons")
with open(self.nos_json_path, "r", encoding="utf-8") as f:
self.nos = json.loads(f.read())
logging.debug("Loaded %s reasons", len(self.nos))
def get_no(self) -> str:
try:
no = random.choice(self.nos)
if no in self.last_5_nos:
return self.get_no() # recurse
self.last_5_nos.append(no)
if len(self.last_5_nos) >= 5:
self.last_5_nos.pop(0)
return no
except Exception as e:
logging.debug("Exception: %s", str(e))
return "No."
async def no(self) -> JSONResponse:
"""NaaS"""
return JSONResponse(content={"no": self.get_no()})
async def upload_activity_image(
self, image: UploadFile, key: Annotated[str, Form()], request: Request
) -> Response:
@@ -75,7 +123,7 @@ class Misc(FastAPI):
with open(fallback_path, "rb") as f:
return Response(content=f.read(), media_type="image/png")
async def get_radio_np(self) -> tuple[str, str, str]:
async def get_radio_np(self, station: str = "main") -> tuple[str, str, str]:
"""
Get radio now playing
Args:
@@ -84,8 +132,13 @@ class Misc(FastAPI):
str: Radio now playing in artist - song format
"""
np: dict = self.radio.radio_util.now_playing
artistsong: str = np.get("artistsong", "N/A - N/A")
np: dict = self.radio.radio_util.now_playing[station]
artistsong: str = "N/A - N/A"
artist = np.get("artist")
song = np.get("song")
if artist and song:
artistsong = f"{artist} - {song}"
album: str = np.get("album", "N/A")
genre: str = np.get("genre", "N/A")
return (artistsong, album, genre)
@@ -96,25 +149,45 @@ class Misc(FastAPI):
"""
# Measure response time w/ test lyric search
time_start: float = time.time() # Start time for response_time
test_lyrics_result = await self.redis_client.ft().search(
test_lyrics_result = self.redis_client.ft().search( # noqa: F841
"@artist: test @song: test"
)
time_end: float = time.time()
# End response time test
total_keys = await self.redis_client.dbsize()
total_keys = self.redis_client.dbsize()
response_time: float = time_end - time_start
(_, ci_keys) = await self.redis_client.scan(
cursor=0, match="ci_session*", count=10000000
)
num_ci_keys = len(ci_keys)
index_info = await self.redis_client.ft().info()
index_info = self.redis_client.ft().info()
indexed_lyrics: int = index_info.get("num_docs")
return JSONResponse(
content={
"responseTime": round(response_time, 7),
"storedKeys": total_keys,
"indexedLyrics": indexed_lyrics,
"sessions": num_ci_keys,
"sessions": -1,
}
)
async def homepage_rq_widget(self) -> JSONResponse:
"""
Homepage RQ Widget Handler
"""
queue_name = "dls"
queue = Queue(queue_name, self.redis_client)
queued = queue.count
started = StartedJobRegistry(queue_name, connection=self.redis_client).count
failed = FailedJobRegistry(queue_name, connection=self.redis_client).count
finished = FinishedJobRegistry(queue_name, connection=self.redis_client).count
deferred = DeferredJobRegistry(queue_name, connection=self.redis_client).count
return JSONResponse(
content={
queue_name: {
"queued": queued,
"started": started,
"failed": failed,
"finished": finished,
"deferred": deferred,
}
}
)
@@ -153,11 +226,11 @@ class Misc(FastAPI):
)
return JSONResponse(content=found_counts)
async def homepage_radio_widget(self) -> JSONResponse:
async def homepage_radio_widget(self, station: str = "main") -> JSONResponse:
"""
Homepage Radio Widget Handler
"""
radio_np: tuple = await self.get_radio_np()
radio_np: tuple = await self.get_radio_np(station)
if not radio_np:
return JSONResponse(
status_code=500,

View File

@@ -2,7 +2,6 @@ import logging
import traceback
import time
import random
import asyncio
from .constructors import (
ValidRadioNextRequest,
ValidRadioReshuffleRequest,
@@ -10,24 +9,31 @@ from .constructors import (
ValidRadioQueueRemovalRequest,
ValidRadioSongRequest,
ValidRadioTypeaheadRequest,
ValidRadioQueueRequest,
Station
)
from utils import radio_util
from uuid import uuid4 as uuid
from typing import Optional
from fastapi import FastAPI, BackgroundTasks, Request, Response, HTTPException
from fastapi import (
FastAPI,
BackgroundTasks,
Request,
Response,
HTTPException,
Depends)
from fastapi_throttle import RateLimiter
from fastapi.responses import RedirectResponse, JSONResponse
class Radio(FastAPI):
"""Radio Endpoints"""
def __init__(self, app: FastAPI, my_util, constants) -> None:
def __init__(self, app: FastAPI, my_util, constants, loop) -> None:
self.app: FastAPI = app
self.util = my_util
self.constants = constants
self.radio_util = radio_util.RadioUtil(self.constants)
self.loop = loop
self.radio_util = radio_util.RadioUtil(self.constants, self.loop)
self.playlists_loaded: bool = False
self.endpoints: dict = {
"radio/np": self.radio_now_playing,
"radio/request": self.radio_request,
@@ -38,23 +44,25 @@ class Radio(FastAPI):
"radio/reshuffle": self.radio_reshuffle,
"radio/queue_remove": self.radio_queue_remove,
"radio/ls._next_": self.radio_get_next,
"radio/album_art": self.album_art_handler,
}
for endpoint, handler in self.endpoints.items():
methods: list[str] = ["POST"]
if endpoint == "radio/album_art":
methods = ["GET"]
app.add_api_route(
f"/{endpoint}", handler, methods=["POST"], include_in_schema=True
f"/{endpoint}", handler, methods=methods, include_in_schema=False,
dependencies=[Depends(
RateLimiter(times=10, seconds=2))] if not endpoint == "radio/np" else None,
)
# NOTE: Not in loop because method is GET for this endpoint
app.add_api_route(
"/radio/album_art",
self.album_art_handler,
methods=["GET"],
include_in_schema=True,
)
app.add_event_handler("startup", self.on_start)
asyncio.get_event_loop().run_until_complete(self.radio_util.load_playlist())
asyncio.get_event_loop().run_until_complete(self.radio_util._ls_skip())
async def on_start(self) -> None:
stations = ", ".join(self.radio_util.db_queries.keys())
logging.info("radio: Initializing stations:\n%s", stations)
await self.radio_util.load_playlists()
async def radio_skip(
self, data: ValidRadioNextRequest, request: Request
@@ -63,12 +71,13 @@ class Radio(FastAPI):
Skip to the next track in the queue, or to uuid specified in skipTo if provided
- **key**: API key
- **skipTo**: Optional UUID to skip to
- **station**: default "main"
"""
try:
if not self.util.check_key(path=request.url.path, req_type=4, key=data.key):
raise HTTPException(status_code=403, detail="Unauthorized")
if data.skipTo:
queue_item = self.radio_util.get_queue_item_by_uuid(data.skipTo)
queue_item = self.radio_util.get_queue_item_by_uuid(data.skipTo, data.station)
if not queue_item:
return JSONResponse(
status_code=500,
@@ -77,12 +86,10 @@ class Radio(FastAPI):
"errorText": "No such queue item.",
},
)
self.radio_util.active_playlist = self.radio_util.active_playlist[
self.radio_util.active_playlist[data.station] = self.radio_util.active_playlist[data.station][
queue_item[0] :
]
if not self.radio_util.active_playlist:
await self.radio_util.load_playlist()
skip_result: bool = await self.radio_util._ls_skip()
skip_result: bool = await self.radio_util._ls_skip(data.station)
status_code = 200 if skip_result else 500
return JSONResponse(
status_code=status_code,
@@ -91,14 +98,17 @@ class Radio(FastAPI):
},
)
except Exception as e:
logging.debug("radio_skip Exception: %s", str(e))
traceback.print_exc()
return JSONResponse(
status_code=500,
content={
"err": True,
"errorText": "General failure.",
},
if not isinstance(e, HTTPException):
return JSONResponse(
status_code=500,
content={
"err": True,
"errorText": "General failure.",
},
)
raise e # Re-raise HTTPException
async def radio_reshuffle(
self, data: ValidRadioReshuffleRequest, request: Request
@@ -106,26 +116,57 @@ class Radio(FastAPI):
"""
Reshuffle the play queue
- **key**: API key
- **station**: default "main"
"""
if not self.util.check_key(path=request.url.path, req_type=4, key=data.key):
raise HTTPException(status_code=403, detail="Unauthorized")
random.shuffle(self.radio_util.active_playlist)
random.shuffle(self.radio_util.active_playlist[data.station])
return JSONResponse(content={"ok": True})
async def radio_get_queue(
self, request: Request, limit: Optional[int] = 15_000
self,
request: Request,
data: Optional[ValidRadioQueueRequest] = None,
) -> JSONResponse:
"""
Get current play queue, up to limit [default: 15k]
- **limit**: Number of queue items to return, default 15k
Get current play queue (paged, 20 results per page)
"""
queue: list = self.radio_util.active_playlist[0:limit]
if not (data and data.station):
return JSONResponse(status_code=500,
content={
"err": True,
"errorText": "Invalid request.",
})
search: Optional[str] = None
draw: int = 0
if isinstance(data, ValidRadioQueueRequest):
search = data.search
draw = data.draw or 0
start: int = int(data.start or 0)
end: int = start + 20
else:
start: int = 0
end: int = 20
orig_queue: list[dict] = self.radio_util.active_playlist[data.station]
if not search:
queue_full: Optional[list] = orig_queue
else:
queue_full = self.radio_util.datatables_search(search, data.station)
if not queue_full:
return JSONResponse(
status_code=500,
content={
"err": True,
"errorText": "No queue found.",
}
)
queue: list = queue_full[start:end]
queue_out: list[dict] = []
for x, item in enumerate(queue):
queue_out.append(
{
"pos": x,
"pos": orig_queue.index(item),
"id": item.get("id"),
"uuid": item.get("uuid"),
"artist": item.get("artist"),
@@ -136,7 +177,15 @@ class Radio(FastAPI):
"duration": item.get("duration"),
}
)
return JSONResponse(content={"items": queue_out})
full_playlist_len: int = len(orig_queue)
filtered_len: int = len(queue_full)
out_json = {
"draw": draw,
"recordsTotal": full_playlist_len,
"recordsFiltered": filtered_len,
"items": queue_out,
}
return JSONResponse(content=out_json)
async def radio_queue_shift(
self, data: ValidRadioQueueShiftRequest, request: Request
@@ -147,11 +196,12 @@ class Radio(FastAPI):
- **key**: API key
- **uuid**: UUID to shift
- **next**: Play track next? If False, skips to the track
- **station**: default "main"
"""
if not self.util.check_key(path=request.url.path, req_type=4, key=data.key):
raise HTTPException(status_code=403, detail="Unauthorized")
queue_item = self.radio_util.get_queue_item_by_uuid(data.uuid)
queue_item = self.radio_util.get_queue_item_by_uuid(data.uuid, data.station)
if not queue_item:
return JSONResponse(
status_code=500,
@@ -161,10 +211,10 @@ class Radio(FastAPI):
},
)
(x, item) = queue_item
self.radio_util.active_playlist.pop(x)
self.radio_util.active_playlist.insert(0, item)
self.radio_util.active_playlist[data.station].pop(x)
self.radio_util.active_playlist[data.station].insert(0, item)
if not data.next:
await self.radio_util._ls_skip()
await self.radio_util._ls_skip(data.station)
return JSONResponse(
content={
"ok": True,
@@ -178,11 +228,12 @@ class Radio(FastAPI):
Remove an item from the current play queue
- **key**: API key
- **uuid**: UUID of queue item to remove
- **station**: default "main"
"""
if not self.util.check_key(path=request.url.path, req_type=4, key=data.key):
raise HTTPException(status_code=403, detail="Unauthorized")
queue_item = self.radio_util.get_queue_item_by_uuid(data.uuid)
queue_item = self.radio_util.get_queue_item_by_uuid(data.uuid, data.station)
if not queue_item:
return JSONResponse(
status_code=500,
@@ -191,7 +242,7 @@ class Radio(FastAPI):
"errorText": "Queue item not found.",
},
)
self.radio_util.active_playlist.pop(queue_item[0])
self.radio_util.active_playlist[data.station].pop(queue_item[0])
return JSONResponse(
content={
"ok": True,
@@ -199,18 +250,27 @@ class Radio(FastAPI):
)
async def album_art_handler(
self, request: Request, track_id: Optional[int] = None
self, request: Request, track_id: Optional[int] = None,
station: Station = "main"
) -> Response:
"""
Get album art, optional parameter track_id may be specified.
Otherwise, current track album art will be pulled.
- **track_id**: Optional, if provided, will attempt to retrieve the album art of this track_id. Current track used otherwise.
- **station**: default "main"
"""
try:
if not track_id:
track_id = self.radio_util.now_playing.get("id")
track_id = self.radio_util.now_playing[station].get("id")
if not track_id:
# Still no track ID
return JSONResponse(status_code=500,
content={
"err": True,
"errorText": "Invalid request",
})
logging.debug("Seeking album art with trackId: %s", track_id)
album_art: Optional[bytes] = await self.radio_util.get_album_art(
album_art: Optional[bytes] = self.radio_util.get_album_art(
track_id=track_id
)
if not album_art:
@@ -220,16 +280,20 @@ class Radio(FastAPI):
)
return Response(content=album_art, media_type="image/png")
except Exception as e:
logging.debug("album_art_handler Exception: %s", str(e))
traceback.print_exc()
return RedirectResponse(
url="https://codey.lol/images/radio_art_default.jpg", status_code=302
)
async def radio_now_playing(self, request: Request) -> JSONResponse:
async def radio_now_playing(self, request: Request,
station: Station = "main") -> JSONResponse:
"""
Get currently playing track info
- **station**: default "main"
"""
ret_obj: dict = {**self.radio_util.now_playing}
ret_obj: dict = {**self.radio_util.now_playing[station]}
ret_obj["station"] = station
try:
ret_obj["elapsed"] = int(time.time()) - ret_obj["start"]
except KeyError:
@@ -246,18 +310,23 @@ class Radio(FastAPI):
) -> JSONResponse:
"""
Get next track
Track will be removed from the queue in the process.
(Track will be removed from the queue in the process.)
- **key**: API key
- **skipTo**: Optional UUID to skip to
- **station**: default: "main"
"""
logging.info("Radio get next")
if data.station not in self.radio_util.active_playlist.keys():
raise HTTPException(status_code=500, detail="No such station/not ready")
if not self.util.check_key(path=request.url.path, req_type=4, key=data.key):
raise HTTPException(status_code=403, detail="Unauthorized")
if (
not isinstance(self.radio_util.active_playlist, list)
or not self.radio_util.active_playlist
not isinstance(self.radio_util.active_playlist[data.station], list)
or not self.radio_util.active_playlist[data.station]
):
await self.radio_util.load_playlist()
await self.radio_util._ls_skip()
if self.radio_util.playlists_loaded:
self.radio_util.playlists_loaded = False
await self.on_start()
return JSONResponse(
status_code=500,
content={
@@ -265,11 +334,10 @@ class Radio(FastAPI):
"errorText": "General failure occurred, prompting playlist reload.",
},
)
next = self.radio_util.active_playlist.pop(0)
next = self.radio_util.active_playlist[data.station].pop(0)
if not isinstance(next, dict):
logging.critical("next is of type: %s, reloading playlist...", type(next))
await self.radio_util.load_playlist()
await self.radio_util._ls_skip()
await self.on_start()
return JSONResponse(
status_code=500,
content={
@@ -282,23 +350,20 @@ class Radio(FastAPI):
time_started: int = int(time.time())
time_ends: int = int(time_started + duration)
if len(self.radio_util.active_playlist) > 1:
self.radio_util.active_playlist.append(next) # Push to end of playlist
else:
await self.radio_util.load_playlist()
self.radio_util.active_playlist[data.station].append(next) # Push to end of playlist
self.radio_util.now_playing = next
self.radio_util.now_playing[data.station] = next
next["start"] = time_started
next["end"] = time_ends
try:
background_tasks.add_task(self.radio_util.webhook_song_change, next)
background_tasks.add_task(self.radio_util.webhook_song_change, next, data.station)
except Exception as e:
logging.info("radio_get_next Exception: %s", str(e))
traceback.print_exc()
try:
album_art = await self.radio_util.get_album_art(track_id=next["id"])
album_art = self.radio_util.get_album_art(track_id=next["id"])
if not album_art:
await self.radio_util.cache_album_art(next["id"], next["file_path"])
self.radio_util.cache_album_art(next["id"], next["file_path"])
except Exception as e:
logging.info("radio_get_next Exception: %s", str(e))
traceback.print_exc()
@@ -314,6 +379,7 @@ class Radio(FastAPI):
- **song**: Song to search
- **artistsong**: Optional "Artist - Song" pair to search, in place of artist/song
- **alsoSkip**: If True, skips to the track; otherwise, track will be placed next up in queue
- **station**: default "main"
"""
if not self.util.check_key(path=request.url.path, req_type=4, key=data.key):
raise HTTPException(status_code=403, detail="Unauthorized")
@@ -337,14 +403,14 @@ class Radio(FastAPI):
},
)
search: bool = await self.radio_util.search_playlist(
artistsong=artistsong, artist=artist, song=song
search: bool = self.radio_util.search_db(
artistsong=artistsong, artist=artist, song=song, station=data.station
)
if data.alsoSkip:
await self.radio_util._ls_skip()
await self.radio_util._ls_skip(data.station)
return JSONResponse(content={"result": search})
async def radio_typeahead(
def radio_typeahead(
self, data: ValidRadioTypeaheadRequest, request: Request
) -> JSONResponse:
"""
@@ -359,9 +425,7 @@ class Radio(FastAPI):
"errorText": "Invalid request.",
},
)
typeahead: Optional[list[str]] = await self.radio_util.trackdb_typeahead(
data.query
)
typeahead: Optional[list[str]] = self.radio_util.trackdb_typeahead(data.query)
if not typeahead:
return JSONResponse(content=[])
return JSONResponse(content=typeahead)

View File

@@ -2,7 +2,8 @@ import os
import random
from typing import LiteralString, Optional, Union
import aiosqlite as sqlite3
from fastapi import FastAPI
from fastapi import FastAPI, Depends
from fastapi_throttle import RateLimiter
from fastapi.responses import JSONResponse
from .constructors import RandMsgRequest
@@ -19,7 +20,10 @@ class RandMsg(FastAPI):
self.endpoint_name = "randmsg"
app.add_api_route(
f"/{self.endpoint_name}", self.randmsg_handler, methods=["POST"]
f"/{self.endpoint_name}",
self.randmsg_handler,
methods=["POST"],
dependencies=[Depends(RateLimiter(times=5, seconds=2))],
)
async def randmsg_handler(
@@ -35,18 +39,18 @@ class RandMsg(FastAPI):
short = data.short
if short:
db_rand_selected: int = 9
db_rand_selected = random.choice([0, 1, 3])
db_rand_selected = random.choice([3])
title_attr: str = "Unknown"
randmsg_db_path: Optional[Union[str, LiteralString]] = None
db_query: Optional[str] = None
match db_rand_selected:
case 0:
randmsg_db_path: Union[str, LiteralString] = os.path.join(
randmsg_db_path = os.path.join(
"/usr/local/share", "sqlite_dbs", "qajoke.db"
) # For qajoke db
db_query: str = (
"SELECT id, ('<b>Q:</b> ' || question || '<br/><b>A:</b> ' \
db_query = "SELECT id, ('<b>Q:</b> ' || question || '<br/><b>A:</b> ' \
|| answer) FROM jokes ORDER BY RANDOM() LIMIT 1" # For qajoke db
)
title_attr = "QA Joke DB"
case 1 | 9:
randmsg_db_path = os.path.join(
@@ -86,9 +90,20 @@ class RandMsg(FastAPI):
WHERE score >= 10000 ORDER BY RANDOM() LIMIT 1"""
title_attr = "r/jokes DB"
if not randmsg_db_path:
return JSONResponse(
content={
"err": True,
}
)
async with sqlite3.connect(database=randmsg_db_path, timeout=1) as _db:
async with await _db.execute(db_query) as _cursor:
result: sqlite3.Row = await _cursor.fetchone()
if not isinstance(_cursor, sqlite3.Cursor):
return JSONResponse(content={"err": True})
result: Optional[sqlite3.Row] = await _cursor.fetchone()
if not result:
return JSONResponse(content={"err": True})
(result_id, result_msg) = result
result_msg = result_msg.strip()
return JSONResponse(

281
endpoints/rip.py Normal file
View File

@@ -0,0 +1,281 @@
import logging
from fastapi import FastAPI, Request, Response, Depends
from fastapi_throttle import RateLimiter
from fastapi.responses import JSONResponse
from utils.sr_wrapper import SRUtil
from auth.deps import get_current_user
from redis import Redis
from rq import Queue, Retry
from rq.job import Job
from rq.job import JobStatus
from rq.registry import (
StartedJobRegistry,
DeferredJobRegistry,
FinishedJobRegistry,
FailedJobRegistry,
ScheduledJobRegistry,
)
from utils.rip_background import bulk_download
from lyric_search.sources import private
from typing import Literal
from pydantic import BaseModel
class ValidBulkFetchRequest(BaseModel):
track_ids: list[int]
target: str
quality: Literal["FLAC", "Lossy"] = "FLAC"
class RIP(FastAPI):
"""
Ripping Endpoints
"""
def __init__(self, app: FastAPI, my_util, constants) -> None:
self.app: FastAPI = app
self.util = my_util
self.trip_util = SRUtil()
self.constants = constants
self.redis_conn = Redis(
host="localhost",
port=6379,
db=0,
password=private.REDIS_PW,
)
self.task_queue = Queue(
"dls",
connection=self.redis_conn,
default_timeout=14400,
default_result_ttl=-1,
default_failure_ttl=86400,
)
self.endpoints: dict = {
"trip/get_artists_by_name": self.artists_by_name_handler,
"trip/get_albums_by_artist_id/{artist_id:path}": self.albums_by_artist_id_handler,
"trip/get_tracks_by_artist_song": self.tracks_by_artist_song_handler,
"trip/get_tracks_by_album_id/{album_id:path}": self.tracks_by_album_id_handler,
"trip/get_track_by_id/{track_id:path}": self.track_by_id_handler,
"trip/bulk_fetch": self.bulk_fetch_handler,
"trip/job/{job_id:path}": self.job_status_handler,
"trip/jobs/list": self.job_list_handler,
}
for endpoint, handler in self.endpoints.items():
dependencies = [Depends(RateLimiter(times=8, seconds=2))]
app.add_api_route(
f"/{endpoint}",
handler,
methods=["GET"] if endpoint != "trip/bulk_fetch" else ["POST"],
include_in_schema=True,
dependencies=dependencies,
)
def _format_job(self, job: Job):
"""Helper to normalize job data into JSON."""
job_status: str | JobStatus = job.get_status()
progress = job.meta.get("progress", 0)
if progress == 100 and not job.meta.get("tarball"):
job_status = "Compressing"
tracks_in = job.meta.get("tracks_in")
tracks_out = len(job.meta.get("tracks", []))
return {
"id": job.id,
"status": job_status.title(),
"result": job.result,
"tarball": job.meta.get("tarball"),
"enqueued_at": job.enqueued_at,
"started_at": job.started_at,
"ended_at": job.ended_at,
"progress": progress,
"tracks": f"{tracks_out} / {tracks_in}"
if isinstance(tracks_in, int)
else tracks_out,
"target": job.meta.get("target"),
"quality": job.meta.get("quality", "Unknown"),
}
async def artists_by_name_handler(
self, artist: str, request: Request, user=Depends(get_current_user)
) -> Response:
"""Get artists by name"""
artists = await self.trip_util.get_artists_by_name(artist)
if not artists:
return Response(status_code=404, content="Not found")
return JSONResponse(content=artists)
async def albums_by_artist_id_handler(
self, artist_id: int, request: Request, user=Depends(get_current_user)
) -> Response:
"""Get albums by artist ID"""
albums = await self.trip_util.get_albums_by_artist_id(artist_id)
if not albums:
return Response(status_code=404, content="Not found")
return JSONResponse(content=albums)
async def tracks_by_album_id_handler(
self,
album_id: int,
request: Request,
user=Depends(get_current_user),
quality: Literal["FLAC", "Lossy"] = "FLAC",
) -> Response:
"""Get tracks by album id"""
tracks = await self.trip_util.get_tracks_by_album_id(album_id, quality)
if not tracks:
return Response(status_code=404, content="Not Found")
return JSONResponse(content=tracks)
async def tracks_by_artist_song_handler(
self, artist: str, song: str, request: Request, user=Depends(get_current_user)
) -> Response:
"""Get tracks by artist and song name"""
logging.critical("Searching for tracks by artist: %s, song: %s", artist, song)
tracks = await self.trip_util.get_tracks_by_artist_song(artist, song)
if not tracks:
return Response(status_code=404, content="Not found")
return JSONResponse(content=tracks)
async def track_by_id_handler(
self,
track_id: int,
request: Request,
quality: Literal["FLAC", "Lossy"] = "FLAC",
user=Depends(get_current_user),
) -> Response:
"""Get track by ID"""
track = await self.trip_util.get_stream_url_by_track_id(track_id, quality)
if not track:
return Response(status_code=404, content="Not found")
return JSONResponse(content={"stream_url": track})
async def bulk_fetch_handler(
self,
data: ValidBulkFetchRequest,
request: Request,
user=Depends(get_current_user),
) -> Response:
"""Bulk fetch a list of track IDs"""
if not data or not data.track_ids or not data.target:
return JSONResponse(
content={
"err": True,
"errorText": "Invalid data",
}
)
track_ids = data.track_ids
target = data.target
job = self.task_queue.enqueue(
bulk_download,
args=(
track_ids,
data.quality,
),
job_timeout=14400,
failure_ttl=86400,
result_ttl=-1,
retry=Retry(max=1, interval=[30]),
meta={
"progress": 0,
"status": "Queued",
"target": target,
"tracks_in": len(track_ids),
"quality": data.quality,
},
)
self.redis_conn.lpush("enqueued_job_ids", job.id)
return JSONResponse(
content={
"job_id": job.id,
"status": "Queued",
"target": job.meta.get("target", None),
"quality": job.meta.get("quality", "Unknown"),
}
)
async def job_status_handler(
self, job_id: str, request: Request, user=Depends(get_current_user)
):
"""Get status and result of a single job"""
job = None
try:
# Try direct fetch first
job = Job.fetch(job_id, connection=self.redis_conn)
except Exception:
# If not found, try registries explicitly (in case fetch failed because the job left the queue)
registries = [
StartedJobRegistry(queue=self.task_queue),
FinishedJobRegistry(queue=self.task_queue),
FailedJobRegistry(queue=self.task_queue),
DeferredJobRegistry(queue=self.task_queue),
ScheduledJobRegistry(queue=self.task_queue),
]
for registry in registries:
if job_id in registry.get_job_ids():
try:
job = Job.fetch(job_id, connection=self.redis_conn)
except Exception:
pass
break
if job is None:
return JSONResponse({"error": "Job not found"}, status_code=404)
return self._format_job(job)
async def job_list_handler(self, request: Request, user=Depends(get_current_user)):
"""List all jobs across all registries (queued, started, finished, failed, etc)."""
jobs_info = []
seen = set()
# 1. Jobs still waiting in queue
for job in self.task_queue.jobs:
jobs_info.append(self._format_job(job))
seen.add(job.id)
# 2. Jobs in Started/Finished/Failed/Deferred registries
registries = [
StartedJobRegistry(queue=self.task_queue),
FinishedJobRegistry(queue=self.task_queue),
FailedJobRegistry(queue=self.task_queue),
DeferredJobRegistry(queue=self.task_queue),
]
for registry in registries:
for jid in registry.get_job_ids():
if jid in seen:
continue
try:
job = Job.fetch(jid, connection=self.redis_conn)
jobs_info.append(self._format_job(job))
seen.add(job.id)
except Exception:
continue # job might have been cleaned up
# 3. Jobs tracked in your custom enqueued_job_ids list
job_ids = self.redis_conn.lrange("enqueued_job_ids", 0, -1)
for jid_bytes in job_ids: # type: ignore
jid = jid_bytes.decode()
if jid in seen:
continue
try:
job = Job.fetch(jid, connection=self.redis_conn)
jobs_info.append(self._format_job(job))
seen.add(job.id)
except Exception:
continue
# ---- Sort newest first ----
def job_sort_key(job):
return (
job.get("ended_at")
or job.get("started_at")
or job.get("enqueued_at")
or 0
)
jobs_info.sort(key=job_sort_key, reverse=True)
return {"jobs": jobs_info}

View File

@@ -1,8 +1,9 @@
import os
import aiosqlite as sqlite3
from fastapi import FastAPI
from fastapi import FastAPI, Depends, Response
from fastapi_throttle import RateLimiter
from fastapi.responses import JSONResponse
from typing import Optional, LiteralString, Union
from typing import Optional, LiteralString, Union, Iterable, cast
from .constructors import ValidShowEpisodeLineRequest, ValidShowEpisodeListRequest
@@ -24,7 +25,11 @@ class Transcriptions(FastAPI):
for endpoint, handler in self.endpoints.items():
app.add_api_route(
f"/{endpoint}", handler, methods=["POST"], include_in_schema=True
f"/{endpoint}",
handler,
methods=["POST"],
include_in_schema=True,
dependencies=[Depends(RateLimiter(times=2, seconds=2))],
)
async def get_episodes_handler(
@@ -83,7 +88,7 @@ class Transcriptions(FastAPI):
async with sqlite3.connect(database=db_path, timeout=1) as _db:
async with await _db.execute(db_query) as _cursor:
result: list[tuple] = await _cursor.fetchall()
result: Iterable[sqlite3.Row] = await _cursor.fetchall()
return JSONResponse(
content={
"show_title": show_title,
@@ -99,7 +104,7 @@ class Transcriptions(FastAPI):
async def get_episode_lines_handler(
self, data: ValidShowEpisodeLineRequest
) -> JSONResponse:
) -> Response:
"""
Get lines for a particular episode
- **s**: Show ID to query
@@ -113,9 +118,7 @@ class Transcriptions(FastAPI):
db_path: Union[str, LiteralString] = os.path.join(
"/usr/local/share", "sqlite_dbs", "sp.db"
)
db_query: str = (
"""SELECT ("S" || Season || "E" || Episode || " " || Title), Character, Line FROM SP_DAT WHERE ID = ?"""
)
db_query: str = """SELECT ("S" || Season || "E" || Episode || " " || Title), Character, Line FROM SP_DAT WHERE ID = ?"""
case 1:
db_path = os.path.join("/usr/local/share", "sqlite_dbs", "futur.db")
db_query = """SELECT ("S" || EP_S || "E" || EP_EP || " " || EP_TITLE || "<br><em>Opener: " || EP_OPENER || "</em>"), EP_LINE_SPEAKER, EP_LINE FROM clean_dialog WHERE EP_ID = ? ORDER BY LINE_ID ASC"""
@@ -135,8 +138,14 @@ class Transcriptions(FastAPI):
async with sqlite3.connect(database=db_path, timeout=1) as _db:
params: tuple = (episode_id,)
async with await _db.execute(db_query, params) as _cursor:
result: list[tuple] = await _cursor.fetchall()
first_result: tuple = result[0]
result: Iterable[sqlite3.Row] = await _cursor.fetchall()
result_list = cast(list[sqlite3.Row], result)
if not result_list:
return Response(
status_code=404,
content="Not found",
)
first_result: sqlite3.Row = result_list[0]
return JSONResponse(
content={
"episode_id": episode_id,

View File

@@ -1,6 +1,7 @@
import importlib
from fastapi import FastAPI
from fastapi import FastAPI, Depends
from fastapi.responses import JSONResponse
from fastapi_throttle import RateLimiter
from typing import Optional, Union
from .constructors import ValidYTSearchRequest
@@ -22,14 +23,15 @@ class YT(FastAPI):
for endpoint, handler in self.endpoints.items():
app.add_api_route(
f"/{endpoint}", handler, methods=["POST"], include_in_schema=True
f"/{endpoint}",
handler,
methods=["POST"],
include_in_schema=True,
dependencies=[Depends(RateLimiter(times=2, seconds=2))],
)
async def yt_video_search_handler(self, data: ValidYTSearchRequest) -> JSONResponse:
"""
Search for YT Video by Title (closest match returned)
- **t**: Title to search
"""
"""Search for YT Video by Title (closest match returned)"""
title: str = data.t
yts_res: Optional[list[dict]] = await self.ytsearch.search(title)

View File

@@ -1,3 +1,4 @@
import logging
from typing import Optional
from openai import AsyncOpenAI
@@ -10,8 +11,15 @@ class GPT:
api_key=self.api_key,
timeout=10.0,
)
self.default_system_prompt: str = """You are a helpful assistant who will provide only totally accurate tidbits of \
info on the specific songs the user may listen to."""
self.default_system_prompt: str = """You are a helpful assistant who will provide ONLY TOTALLY ACCURATE (!!) tidbits of \
info on the specific songs the user may listen to.
# IMPORTANT
As an AI assistant, you may not always have much information available to describe the track provided. That is TOTALLY FINE!
What is not acceptable is hallucinations, for example:
- Do NOT mention the name of the album the track was included on. You rarely have correct information in this context.
- If no 100% reliable data is available, do NOT (!!) mention the album..."""
logging.getLogger("httpx").setLevel("CRITICAL")
async def get_completion(
self, prompt: str, system_prompt: Optional[str] = None
@@ -30,7 +38,8 @@ class GPT:
},
],
model="gpt-4o-mini",
temperature=0.35,
temperature=1.00,
max_completion_tokens=512,
)
response: Optional[str] = chat_completion.choices[0].message.content
return response

View File

@@ -78,7 +78,7 @@ class Aggregate:
traceback.print_exc()
logging.info("Could not increment redis failed counter: %s", str(e))
self.notifier.send(
f"ERROR @ {__file__.rsplit("/", maxsplit=1)[-1]}",
f"ERROR @ {__file__.rsplit('/', maxsplit=1)[-1]}",
f"Could not increment redis failed counter: {str(e)}",
)
return search_result

View File

@@ -22,7 +22,7 @@ class Cache:
def __init__(self) -> None:
self.cache_db: Union[str, LiteralString] = os.path.join(
"/", "usr", "local", "share", "sqlite_dbs", "cached_lyrics.db"
"/usr/local/share", "sqlite_dbs", "cached_lyrics.db"
)
self.redis_cache = redis_cache.RedisCache()
self.notifier = notifier.DiscordNotifier()
@@ -32,7 +32,7 @@ class Cache:
pragma temp_store = memory; pragma mmap_size = 30000000000;"
)
self.sqlite_exts: list[str] = [
"/home/api/api/solibs/spellfix1.cpython-311-x86_64-linux-gnu.so"
"/home/kyle/api/solibs/spellfix1.cpython-311-x86_64-linux-gnu.so"
]
self.label: str = "Cache"
@@ -66,6 +66,8 @@ class Cache:
confidence=row["confidence"],
)
else:
if not sqlite_rows:
return None
for row in sqlite_rows:
if row[0] == matched_id:
(_id, artist, song, lyrics, original_src) = row[:-1]
@@ -89,10 +91,8 @@ class Cache:
logging.debug(
"Checking whether %s is already stored", artistsong.replace("\n", " - ")
)
check_query: str = (
'SELECT id, artist, song FROM lyrics WHERE editdist3((lower(artist) || " " || lower(song)), (? || " " || ?))\
check_query: str = 'SELECT id, artist, song FROM lyrics WHERE editdist3((lower(artist) || " " || lower(song)), (? || " " || ?))\
<= 410 ORDER BY editdist3((lower(artist) || " " || lower(song)), ?) ASC LIMIT 1'
)
artistsong_split = artistsong.split("\n", maxsplit=1)
artist = artistsong_split[0].lower()
song = artistsong_split[1].lower()
@@ -132,7 +132,7 @@ class Cache:
f"cache::store >> {str(e)}",
)
await self.notifier.send(
f"ERROR @ {__file__.rsplit("/", maxsplit=1)[-1]}",
f"ERROR @ {__file__.rsplit('/', maxsplit=1)[-1]}",
f"cache::store >> `{str(e)}`",
)
@@ -213,8 +213,10 @@ class Cache:
lyrics = regex.sub(r"(<br>|\n|\r\n)", " / ", lyr_result.lyrics.strip())
lyrics = regex.sub(r"\s{2,}", " ", lyrics)
insert_query = "INSERT INTO lyrics (src, date_retrieved, artist, song, artistsong, confidence, lyrics)\
insert_query = (
"INSERT INTO lyrics (src, date_retrieved, artist, song, artistsong, confidence, lyrics)\
VALUES(?, ?, ?, ?, ?, ?, ?)"
)
params = (
lyr_result.src,
time.time(),
@@ -233,8 +235,8 @@ class Cache:
await db_conn.commit()
logging.info("Stored %s to SQLite!", artistsong.replace("\n", " - "))
return _cursor.lastrowid
except:
logging.critical("Cache storage error!")
except Exception as e:
logging.critical("Cache storage error: %s", str(e))
traceback.print_exc()
async def search(self, artist: str, song: str, **kwargs) -> Optional[LyricsResult]:
@@ -258,10 +260,8 @@ class Cache:
if artist == "!" and song == "!":
random_search = True
search_query: str = (
"SELECT id, artist, song, lyrics, src, confidence\
search_query: str = "SELECT id, artist, song, lyrics, src, confidence\
FROM lyrics ORDER BY RANDOM() LIMIT 1"
)
logging.info("Searching %s - %s on %s", artist, song, self.label)
@@ -306,7 +306,8 @@ class Cache:
)
await self.redis_cache.increment_found_count(self.label)
return matched
except:
except Exception as e:
logging.debug(str(e))
pass
"""SQLite: Fallback"""
@@ -319,11 +320,9 @@ class Cache:
self.cache_pre_query
) as _db_cursor:
if not random_search:
search_query: str = (
'SELECT id, artist, song, lyrics, src, confidence FROM lyrics\
search_query: str = 'SELECT id, artist, song, lyrics, src, confidence FROM lyrics\
WHERE editdist3((lower(artist) || " " || lower(song)), (? || " " || ?))\
<= 410 ORDER BY editdist3((lower(artist) || " " || lower(song)), ?) ASC LIMIT 10'
)
search_params: tuple = (
artist.strip(),
song.strip(),
@@ -358,5 +357,6 @@ class Cache:
matched.time = time_diff
await self.redis_cache.increment_found_count(self.label)
return matched
except:
except Exception as e:
logging.info("Exception: %s", str(e))
traceback.print_exc()

View File

@@ -1,4 +1,7 @@
SCRAPE_HEADERS: dict[str, str] = {
"accept": "*/*",
"Accept-Encoding": "gzip, deflate, br, zstd",
"Accept-Language": "en-US,en;q=0.5",
"Connection": "keep-alive",
"User-Agent": "Mozilla/5.0 (X11; Linux x86_64; rv:130.0) Gecko/20100101 Firefox/130.0",
}

View File

@@ -8,6 +8,7 @@ import re
from typing import Optional
from aiohttp import ClientTimeout, ClientSession
from bs4 import BeautifulSoup, ResultSet # type: ignore
from tenacity import retry, stop_after_attempt, wait_fixed
import html as htm
from . import private, common, cache, redis_cache
from lyric_search import utils
@@ -27,12 +28,13 @@ class Genius:
self.genius_url: str = private.GENIUS_URL
self.genius_search_url: str = f"{self.genius_url}api/search/song?q="
self.headers: dict = common.SCRAPE_HEADERS
self.timeout = ClientTimeout(connect=3, sock_read=5)
self.timeout = ClientTimeout(connect=5, sock_read=5)
self.datautils = utils.DataUtils()
self.matcher = utils.TrackMatcher()
self.cache = cache.Cache()
self.redis_cache = redis_cache.RedisCache()
@retry(stop=stop_after_attempt(3), wait=wait_fixed(0.2))
async def search(self, artist: str, song: str, **kwargs) -> Optional[LyricsResult]:
"""
Genius Search
@@ -43,8 +45,8 @@ class Genius:
Optional[LyricsResult]: The result, if found - None otherwise.
"""
try:
artist: str = artist.strip().lower()
song: str = song.strip().lower()
artist = artist.strip().lower()
song = song.strip().lower()
time_start: float = time.time()
logging.info("Searching %s - %s on %s", artist, song, self.label)
search_term: str = f"{artist}%20{song}"
@@ -54,6 +56,7 @@ class Genius:
f"{self.genius_search_url}{search_term}",
timeout=self.timeout,
headers=self.headers,
proxy=private.GENIUS_PROXY,
) as request:
request.raise_for_status()
text: Optional[str] = await request.text()
@@ -101,11 +104,15 @@ class Genius:
best_match: tuple = self.matcher.find_best_match(
input_track=searched, candidate_tracks=to_scrape
)
logging.info("To scrape: %s", to_scrape)
((scrape_stub, track), confidence) = best_match
scrape_url: str = f"{self.genius_url}{scrape_stub[1:]}"
async with client.get(
scrape_url, timeout=self.timeout, headers=self.headers
scrape_url,
timeout=self.timeout,
headers=self.headers,
proxy=private.GENIUS_PROXY,
) as scrape_request:
scrape_request.raise_for_status()
scrape_text: Optional[str] = await scrape_request.text()
@@ -166,5 +173,6 @@ class Genius:
await self.redis_cache.increment_found_count(self.label)
await self.cache.store(matched)
return matched
except:
except Exception as e:
logging.debug("Exception: %s", str(e))
traceback.print_exc()

View File

@@ -6,6 +6,7 @@ import traceback
import logging
from typing import Optional, Union
from aiohttp import ClientTimeout, ClientSession
from tenacity import retry, stop_after_attempt, wait_fixed
from lyric_search import utils
from lyric_search.constructors import LyricsResult
from . import common, cache, redis_cache
@@ -22,12 +23,13 @@ class LRCLib:
self.label: str = "LRCLib"
self.lrclib_url: str = "https://lrclib.net/api/search"
self.headers: dict = common.SCRAPE_HEADERS
self.timeout = ClientTimeout(connect=2, sock_read=4)
self.timeout = ClientTimeout(connect=3, sock_read=8)
self.datautils = utils.DataUtils()
self.matcher = utils.TrackMatcher()
self.cache = cache.Cache()
self.redis_cache = redis_cache.RedisCache()
@retry(stop=stop_after_attempt(2), wait=wait_fixed(0.5))
async def search(
self, artist: str, song: str, plain: Optional[bool] = True
) -> Optional[LyricsResult]:
@@ -99,9 +101,13 @@ class LRCLib:
if isinstance(result["syncedLyrics"], str)
]
best_match = self.matcher.find_best_match(
input_track, possible_matches
)[0]
try:
best_match = self.matcher.find_best_match(
input_track, possible_matches
)[0]
except: # noqa
pass
if not best_match:
return
best_match_id = best_match[0]
@@ -156,7 +162,9 @@ class LRCLib:
time=time_diff,
)
await self.redis_cache.increment_found_count(self.label)
await self.cache.store(matched)
if plain:
await self.cache.store(matched)
return matched
except:
except Exception as e:
logging.debug("Exception: %s", str(e))
traceback.print_exc()

View File

@@ -14,7 +14,7 @@ from lyric_search.constructors import LyricsResult
import redis.asyncio as redis
from redis.commands.search.query import Query # type: ignore
from redis.commands.search.indexDefinition import IndexDefinition, IndexType # type: ignore
from redis.commands.search.field import TextField, TagField # type: ignore
from redis.commands.search.field import TextField # type: ignore
from redis.commands.json.path import Path # type: ignore
from . import private
@@ -204,8 +204,8 @@ class RedisCache:
)
return search_res_out
except Exception as e:
logging.debug("Exception: %s", str(e))
traceback.print_exc()
# await self.notifier.send(f"ERROR @ {__file__.rsplit("/", maxsplit=1)[-1]}", f"{str(e)}\nSearch was: {artist} - {song}; fuzzy: {fuzzy_artist} - {fuzzy_song}")
return None
async def redis_store(self, sqlite_id: int, lyr_result: LyricsResult) -> None:

View File

@@ -1,5 +1,5 @@
from difflib import SequenceMatcher
from typing import List, Optional, Union, Any
from typing import List, Optional
import logging
import regex
from regex import Pattern
@@ -111,8 +111,21 @@ class DataUtils:
"""
def __init__(self) -> None:
self.lrc_regex = regex.compile(
r"\[([0-9]{2}:[0-9]{2})\.[0-9]{1,3}\](\s(.*)){0,}"
self.lrc_regex = (
regex.compile( # capture mm:ss and optional .xxx, then the lyric text
r"""
\[ # literal “[”
( # 1st (and only) capture group:
[0-9]{2} # two-digit minutes
:[0-9]{2} # colon + two-digit seconds
(?:\.[0-9]{1,3})? # optional decimal part, e.g. .123
)
\] # literal “]”
\s* # optional whitespace
(.*) # capture the rest of the line as words
""",
regex.VERBOSE,
)
)
self.scrub_regex_1: Pattern = regex.compile(r"(\[.*?\])(\s){0,}(\:){0,1}")
self.scrub_regex_2: Pattern = regex.compile(
@@ -161,7 +174,7 @@ class DataUtils:
)
_timetag = reg_helper[0]
if not reg_helper[1].strip():
_words = ""
continue
else:
_words = reg_helper[1].strip()
lrc_out.append(

View File

@@ -31,7 +31,6 @@ class LastFM:
]
async with ClientSession() as session:
async with await session.get(
self.api_base_url,
params=request_params,
@@ -51,7 +50,8 @@ class LastFM:
.split("<a href")[0],
}
return ret_obj
except:
except Exception as e:
logging.debug("Exception: %s", str(e))
traceback.print_exc()
return {
"err": "Failed",
@@ -92,18 +92,24 @@ class LastFM:
) as request:
request.raise_for_status()
data: dict = await request.json()
if not data:
return None
data = data.get("track", None)
if not isinstance(data.get("artist"), dict):
return None
artist_mbid: int = data.get("artist", None).get("mbid")
album: str = data.get("album", None).get("title")
album: str = data.get("album", None)
if not isinstance(album, dict):
return None
album = album.get("title")
ret_obj: dict = {
"artist_mbid": artist_mbid,
"album": album,
}
return ret_obj
except:
except Exception as e:
traceback.print_exc()
logging.debug("Exception: %s", str(e))
return {
"err": "General Failure",
}
@@ -133,8 +139,9 @@ class LastFM:
return ret_obj
except:
except Exception as e:
traceback.print_exc()
logging.debug("Exception: %s", str(e))
return {
"err": "General Failure",
}
@@ -179,7 +186,8 @@ class LastFM:
and int(item.get("playcount")) >= 50
]
return ret_obj
except:
except Exception as e:
logging.debug("Exception: %s", str(e))
traceback.print_exc()
return {
"err": "Failed",
@@ -202,7 +210,8 @@ class LastFM:
return -1
artist_id: int = int(artist_search[0].get("id", 0))
return artist_id
except:
except Exception as e:
logging.debug("Exception: %s", str(e))
traceback.print_exc()
return -1
@@ -253,7 +262,8 @@ class LastFM:
"members": members,
}
return ret_obj
except:
except Exception as e:
logging.debug("Exception: %s", str(e))
traceback.print_exc()
return {
"err": "Failed",
@@ -285,7 +295,8 @@ class LastFM:
"err": "Failed",
}
return artist_info
except:
except Exception as e:
logging.debug("Exception: %s", str(e))
traceback.print_exc()
return {
"err": "Failed",
@@ -338,7 +349,8 @@ class LastFM:
}
try:
track_key: list = data.get("tracks", None).get("track")
except:
except Exception as e:
logging.debug("Exception: %s", str(e))
track_key = []
if isinstance(track_key, list):
ret_obj["tracks"] = [
@@ -358,7 +370,8 @@ class LastFM:
}
]
return ret_obj
except:
except Exception as e:
logging.debug("Exception: %s", str(e))
traceback.print_exc()
return {
"err": "Failed",

194
utils/meme_util.py Normal file
View File

@@ -0,0 +1,194 @@
import os
import logging
import io
import traceback
import math
from typing import Optional
import aiosqlite as sqlite3
from PIL import Image
class MemeUtil:
"""
Meme Utils
"""
def __init__(self, constants) -> None:
self.constants = constants
self.meme_db_path = os.path.join("/usr/local/share", "sqlite_dbs", "meme.db")
def is_png(self, buffer: bytes | io.BytesIO) -> bool:
"""
Check if image (in-memory buffer, or bytes) is a PNG
Args:
buffer (bytes|io.BytesIO)
Returns:
bool
"""
# Accepts either bytes or a BytesIO-like object
signature = None
if isinstance(buffer, io.BytesIO):
if hasattr(buffer, "read") and hasattr(buffer, "seek"):
pos = buffer.tell()
buffer.seek(0)
signature = buffer.read(8)
buffer.seek(pos)
else:
signature = buffer[:8]
return signature == b"\x89PNG\r\n\x1a\n"
def convert_to_png(self, in_buffer: io.BytesIO) -> bytes:
"""
Convert an in-memory buffer to PNG
Args:
in_buffer (io.BytesIO)
Returns:
bytes
"""
in_buffer.seek(0)
with Image.open(in_buffer) as im:
if im.format == "PNG":
raise ValueError("Already a PNG")
out_buffer = io.BytesIO()
im.save(out_buffer, format="PNG")
out_buffer.seek(0)
return out_buffer.read()
async def get_meme_by_id(self, meme_id: int) -> Optional[bytes]:
"""
Get meme by id
Args:
meme_id (int)
Returns:
Optional[bytes]
"""
ret_image: Optional[bytes] = None
buffer: Optional[io.BytesIO] = None
async with sqlite3.connect(self.meme_db_path, timeout=5) as db_conn:
db_conn.row_factory = sqlite3.Row
query: str = "SELECT image FROM memes WHERE id = ? LIMIT 1"
async with await db_conn.execute(query, (meme_id,)) as db_cursor:
result = await db_cursor.fetchone()
if not result:
return None
buffer = io.BytesIO(result["image"])
is_png = self.is_png(buffer)
if not is_png:
logging.debug(
"Converting meme_id: %s, not detected as PNG", meme_id
)
ret_image = self.convert_to_png(buffer)
converted = await self.replace_with_converted_png(
meme_id, ret_image
)
if converted:
logging.info("Converted meme_id: %s", meme_id)
else:
logging.info("Failed to convert meme_id: %s", meme_id)
else:
ret_image = result["image"]
return ret_image
async def get_random_meme(self) -> Optional[bytes]:
"""
Get random meme
Returns:
Optional[bytes]
"""
ret_image: Optional[bytes] = None
buffer: Optional[io.BytesIO] = None
async with sqlite3.connect(self.meme_db_path, timeout=5) as db_conn:
db_conn.row_factory = sqlite3.Row
query: str = "SELECT id, image FROM memes ORDER BY RANDOM() LIMIT 1"
async with await db_conn.execute(query) as db_cursor:
result = await db_cursor.fetchone()
if not result:
return None
meme_id = result["id"]
buffer = io.BytesIO(result["image"])
is_png = self.is_png(buffer)
if not is_png:
logging.debug("Converting %s, not detected as PNG", meme_id)
ret_image = self.convert_to_png(buffer)
else:
ret_image = result["image"]
return ret_image
async def list_memes(self, page: int) -> Optional[list]:
"""
List memes (paginated)
Args:
page (id)
Returns:
Optional[list]
"""
out_result: list = []
async with sqlite3.connect(self.meme_db_path, timeout=5) as db_conn:
db_conn.row_factory = sqlite3.Row
rows_per_page: int = 10
offset: int = (page - 1) * rows_per_page
query: str = "SELECT id, timestamp FROM memes ORDER BY timestamp DESC LIMIT 10 OFFSET ?"
async with await db_conn.execute(query, (offset,)) as db_cursor:
results = await db_cursor.fetchall()
for result in results:
result_id = result["id"]
result_timestamp = result["timestamp"]
out_result.append(
{
"id": result_id,
"timestamp": result_timestamp,
}
)
return out_result
async def get_page_count(self) -> Optional[int]:
"""
Get page count
Returns:
Optional[int]
"""
async with sqlite3.connect(self.meme_db_path, timeout=5) as db_conn:
db_conn.row_factory = sqlite3.Row
rows_per_page: int = 10
pages: Optional[int] = None
query: str = "SELECT count(id) AS count FROM memes"
async with await db_conn.execute(query) as db_cursor:
result = await db_cursor.fetchone()
if not result:
return None
count = result["count"]
if not isinstance(count, int):
return None
pages = math.ceil(count / rows_per_page)
return pages
async def replace_with_converted_png(self, meme_id: int, meme_image: bytes) -> bool:
"""
Replace stored image with converted PNG
Args:
meme_id (int)
meme_image (bytes)
Returns:
bool
"""
update_query: str = "UPDATE memes SET image = ?, file_ext = 'PNG' WHERE id = ?"
params: tuple = (
meme_image,
meme_id,
)
try:
async with sqlite3.connect(self.meme_db_path, timeout=5) as db_conn:
update = await db_conn.execute_insert(update_query, params)
if not update:
logging.info(
"replace_with_converted_png: Failed -> Update: %s\nFor meme_id: %s",
update,
meme_id,
)
return False
else:
return True
except Exception as e:
logging.info("replace_with_converted_png: %s", str(e))
traceback.print_exc()
return False

View File

@@ -3,43 +3,42 @@ import traceback
import time
import datetime
import os
import random
from uuid import uuid4 as uuid
from typing import Union, Optional, Iterable
from aiohttp import ClientSession, ClientTimeout
import regex
from regex import Pattern
import aiosqlite as sqlite3
import sqlite3
import gpt
import music_tag # type: ignore
from rapidfuzz import fuzz
from endpoints.constructors import RadioException
import redis.asyncio as redis
from redis.commands.search.query import Query # noqa
from redis.commands.search.indexDefinition import IndexDefinition, IndexType # noqa
from redis.commands.search.field import TextField # noqa
from redis.commands.json.path import Path # noqa
from lyric_search.sources import private
double_space: Pattern = regex.compile(r"\s{2,}")
"""
TODO:
- Album art rework
- get_genre should only be called once for load_playlist, rework get_genre to (optionally) accept a list of artists,
and return (optionally) a list instead of an str
- Allow tracks to be queried again based on genre; unable to query tracks based on genre presently,
as genre was moved outside track_file_map to artist_genre_map
- Ask GPT when we encounter an untagged (no genre defined) artist, automation is needed for this tedious task
- etc..
"""
non_alnum: Pattern = regex.compile(r"[^a-zA-Z0-9]")
class RadioUtil:
"""
Radio Utils
"""
def __init__(self, constants) -> None:
def __init__(self, constants, loop) -> None:
self.constants = constants
self.loop = loop
self.gpt = gpt.GPT(self.constants)
self.ls_uri: str = self.constants.LS_URI
self.redis_client = redis.Redis(password=private.REDIS_PW)
self.sqlite_exts: list[str] = [
"/home/api/api/solibs/spellfix1.cpython-311-x86_64-linux-gnu.so"
"/home/kyle/api/solibs/spellfix1.cpython-311-x86_64-linux-gnu.so"
]
self.active_playlist_path: str = os.path.join(
self.playback_db_path: str = os.path.join(
"/usr/local/share", "sqlite_dbs", "track_file_map.db"
)
self.artist_genre_db_path: str = os.path.join(
@@ -48,26 +47,62 @@ class RadioUtil:
self.album_art_db_path: str = os.path.join(
"/usr/local/share", "sqlite_dbs", "track_album_art.db"
)
self.active_playlist_name = "default" # not used
self.active_playlist: list[dict] = []
self.now_playing: dict = {
"artist": "N/A",
"song": "N/A",
"album": "N/A",
"genre": "N/A",
"artistsong": "N/A - N/A",
"duration": 0,
"start": 0,
"end": 0,
"file_path": None,
"id": None,
self.db_queries = {
'main': self.constants.RADIO_DB_QUERY,
'rap': self.constants.RADIO_DB_QUERY_RAP,
'pop': self.constants.RADIO_DB_QUERY_POP,
# 'classical': self.constants.RADIO_DB_QUERY_CLASSICAL,
'rock': self.constants.RADIO_DB_QUERY_ROCK,
'electronic': self.constants.RADIO_DB_QUERY_ELECTRONIC,
}
self.playback_genres: list[str] = [
# "metal",
# # "hip hop",
# "metalcore",
# "deathcore",
# "edm",
# "electronic",
# "post-hardcore",
# "post hardcore",
# # "hard rock",
# # "rock",
# # # "ska",
# # "post punk",
# # "post-punk",
# # "pop punk",
# # "pop-punk",
]
self.playlists: list = [
"main",
"rock",
"rap",
"electronic",
"pop",
]
self.active_playlist: dict[str, list[dict]] = {}
self.playlists_loaded: bool = False
self.now_playing: dict[str, dict] = {
playlist: {
"artist": "N/A",
"song": "N/A",
"album": "N/A",
"genre": "N/A",
"artistsong": "N/A - N/A",
"duration": 0,
"start": 0,
"end": 0,
"file_path": None,
"id": None,
} for playlist in self.playlists
}
self.webhooks: dict = {
"gpt": {
"hook": self.constants.GPT_WEBHOOK,
"lastRun": None,
},
"sfm": {
"hook": self.constants.SFM_WEBHOOK,
"lastRun": None,
},
}
@@ -81,7 +116,7 @@ class RadioUtil:
"""
return str(datetime.timedelta(seconds=s)).split(".", maxsplit=1)[0]
async def trackdb_typeahead(self, query: str) -> Optional[list[str]]:
def trackdb_typeahead(self, query: str) -> Optional[list[str]]:
"""
Query track db for typeahead
Args:
@@ -91,22 +126,54 @@ class RadioUtil:
"""
if not query:
return None
async with sqlite3.connect(self.active_playlist_path, timeout=1) as _db:
with sqlite3.connect(self.playback_db_path, timeout=1) as _db:
_db.row_factory = sqlite3.Row
db_query: str = """SELECT DISTINCT(LOWER(TRIM(artist) || " - " || TRIM(song))),\
(TRIM(artist) || " - " || TRIM(song)) as artistsong FROM tracks WHERE\
artistsong LIKE ? LIMIT 30"""
db_params: tuple[str] = (f"%{query}%",)
async with _db.execute(db_query, db_params) as _cursor:
result: Iterable[sqlite3.Row] = await _cursor.fetchall()
out_result = [str(r["artistsong"]) for r in result]
return out_result
_cursor = _db.execute(db_query, db_params)
result: Iterable[sqlite3.Row] = _cursor.fetchall()
out_result = [str(r["artistsong"]) for r in result]
return out_result
async def search_playlist(
def datatables_search(self,
filter: str,
station: str = "main") -> Optional[list[dict]]:
"""DataTables Search
Args:
filter (str): The filter query to fuzzy match with
Returns:
list[dict]: List of matching playlist items (if any are found)
"""
filter = filter.strip().lower()
matched: list[dict] = []
for item in self.active_playlist[station]:
artist: str = item.get("artist", "")
song: str = item.get("song", "")
artistsong: str = item.get("artistsong", "")
album: str = item.get("album", "")
if not artist or not song or not artistsong:
continue
if non_alnum.sub("", filter) in non_alnum.sub("", artistsong).lower():
matched.append(item)
continue
if (
fuzz.ratio(filter, artist) >= 85
or fuzz.ratio(filter, song) >= 85
or fuzz.ratio(filter, album) >= 85
):
matched.append(item)
return matched
def search_db(
self,
artistsong: Optional[str] = None,
artist: Optional[str] = None,
song: Optional[str] = None,
station: str = "main"
) -> bool:
"""
Search for track, add it up next in play queue if found
@@ -122,11 +189,9 @@ class RadioUtil:
if not artistsong and (not artist or not song):
raise RadioException("No query provided")
try:
search_query: str = (
'SELECT id, artist, song, (artist || " - " || song) AS artistsong, album, file_path, duration FROM tracks\
search_query: str = 'SELECT id, artist, song, (artist || " - " || song) AS artistsong, album, file_path, duration FROM tracks\
WHERE editdist3((lower(artist) || " " || lower(song)), (? || " " || ?))\
<= 410 ORDER BY editdist3((lower(artist) || " " || lower(song)), ?) ASC LIMIT 1'
)
if artistsong:
artistsong_split: list = artistsong.split(" - ", maxsplit=1)
(search_artist, search_song) = tuple(artistsong_split)
@@ -135,42 +200,42 @@ class RadioUtil:
search_song = song
if not artistsong:
artistsong = f"{search_artist} - {search_song}"
if not search_artist or not search_song or not artistsong:
raise RadioException("No query provided")
search_params = (
search_artist.lower(),
search_song.lower(),
artistsong.lower(),
)
async with sqlite3.connect(self.active_playlist_path, timeout=2) as db_conn:
await db_conn.enable_load_extension(True)
with sqlite3.connect(self.playback_db_path, timeout=2) as db_conn:
db_conn.enable_load_extension(True)
for ext in self.sqlite_exts:
await db_conn.load_extension(ext)
db_conn.load_extension(ext)
db_conn.row_factory = sqlite3.Row
async with await db_conn.execute(
search_query, search_params
) as db_cursor:
result: Optional[sqlite3.Row | bool] = await db_cursor.fetchone()
if not result or not isinstance(result, sqlite3.Row):
return False
push_obj: dict = {
"id": result["id"],
"uuid": str(uuid().hex),
"artist": double_space.sub(" ", result["artist"].strip()),
"song": double_space.sub(" ", result["song"].strip()),
"artistsong": result["artistsong"].strip(),
"genre": await self.get_genre(
double_space.sub(" ", result["artist"].strip())
),
"file_path": result["file_path"],
"duration": result["duration"],
}
self.active_playlist.insert(0, push_obj)
return True
db_cursor = db_conn.execute(search_query, search_params)
result: Optional[sqlite3.Row | bool] = db_cursor.fetchone()
if not result or not isinstance(result, sqlite3.Row):
return False
push_obj: dict = {
"id": result["id"],
"uuid": str(uuid().hex),
"artist": double_space.sub(" ", result["artist"].strip()),
"song": double_space.sub(" ", result["song"].strip()),
"artistsong": result["artistsong"].strip(),
"genre": self.get_genre(
double_space.sub(" ", result["artist"].strip())
),
"file_path": result["file_path"],
"duration": result["duration"],
}
self.active_playlist[station].insert(0, push_obj)
return True
except Exception as e:
logging.critical("search_playlist:: Search error occurred: %s", str(e))
logging.critical("search_db:: Search error occurred: %s", str(e))
traceback.print_exc()
return False
async def add_genre(self, artist: str, genre: str) -> bool:
def add_genre(self, artist: str, genre: str) -> bool:
"""
Add artist/genre pairing to DB
Args:
@@ -180,21 +245,115 @@ class RadioUtil:
bool
"""
try:
async with sqlite3.connect(self.artist_genre_db_path, timeout=2) as _db:
query: str = "INSERT OR IGNORE INTO artist_genre (artist, genre) VALUES(?, ?)"
with sqlite3.connect(self.artist_genre_db_path, timeout=2) as _db:
query: str = (
"INSERT OR IGNORE INTO artist_genre (artist, genre) VALUES(?, ?)"
)
params: tuple[str, str] = (artist, genre)
res = await _db.execute_insert(query, params)
if res:
res = _db.execute(query, params)
if isinstance(res.lastrowid, int):
logging.debug(
"Query executed successfully for %s/%s, committing",
artist,
genre,
)
_db.commit()
return True
logging.debug(
"Failed to store artist/genre pair: %s/%s (res: %s)", artist, genre, res
)
return False
except Exception as e:
logging.info("Failed to store artist/genre pair: %s/%s (%s)",
artist, genre, str(e))
logging.info(
"Failed to store artist/genre pair: %s/%s (%s)", artist, genre, str(e)
)
traceback.print_exc()
return False
def add_genres(self, pairs: list[dict[str, str]]) -> bool:
"""
(BATCH) Add artist/genre pairings to DB
Expects list of dicts comprised of artist name (key), genre (value)
Args:
pairs (list[dict[str, str]]): Pairs of artist/genres to add, list of dicts
Returns:
bool
"""
try:
added_rows: int = 0
artist = None
genre = None
with sqlite3.connect(self.artist_genre_db_path, timeout=2) as _db:
for pair in pairs:
try:
artist, genre = pair
query: str = "INSERT OR IGNORE INTO artist_genre (artist, genre) VALUES(?, ?)"
params: tuple[str, str] = (artist, genre)
res = _db.execute(query, params)
if isinstance(res.lastrowid, int):
logging.debug(
"add_genres: Query executed successfully for %s/%s",
artist,
genre,
)
added_rows += 1
else:
logging.debug(
"Failed to store artist/genre pair: %s/%s (res: %s)",
artist,
genre,
res,
)
except Exception as e:
logging.info(
"Failed to store artist/genre pair: %s/%s (%s)",
artist,
genre,
str(e),
)
continue
if added_rows:
logging.info("add_genres: Committing %s rows", added_rows)
_db.commit()
return True
logging.info("add_genres: Failed (No rows added)")
return False
except Exception as e:
logging.info("Failed to store artist/genre pairs: %s", str(e))
traceback.print_exc()
return False
async def get_genre(self, artist: str) -> str:
def get_genres(self, input_artists: list[str]) -> dict:
"""
Retrieve genres for given list of artists
Batch equivalent of get_genre
Args:
input_artists (list): The artists to query
Returns:
dict[str, str]
"""
time_start: float = time.time()
artist_genre: dict[str, str] = {}
query: str = (
"SELECT genre FROM artist_genre WHERE artist LIKE ? COLLATE NOCASE"
)
with sqlite3.connect(self.artist_genre_db_path) as _db:
_db.row_factory = sqlite3.Row
for artist in input_artists:
params: tuple[str] = (f"%%{artist}%%",)
_cursor = _db.execute(query, params)
res = _cursor.fetchone()
if not res:
artist_genre[artist] = "N/A"
continue
artist_genre[artist] = res["genre"]
time_end: float = time.time()
logging.info(f"Time taken: {time_end - time_start}")
return artist_genre
def get_genre(self, artist: str) -> str:
"""
Retrieve Genre for given Artist
Args:
@@ -204,92 +363,101 @@ class RadioUtil:
"""
try:
artist = artist.strip()
query: str = "SELECT genre FROM artist_genre WHERE artist LIKE ?"
params: tuple[str] = (f"%%{artist}%%",)
async with sqlite3.connect(self.artist_genre_db_path, timeout=2) as _db:
query: str = (
"SELECT genre FROM artist_genre WHERE artist LIKE ? COLLATE NOCASE"
)
params: tuple[str] = (artist,)
with sqlite3.connect(self.playback_db_path, timeout=2) as _db:
_db.row_factory = sqlite3.Row
async with await _db.execute(query, params) as _cursor:
res = await _cursor.fetchone()
if not res:
raise RadioException(
f"Could not locate {artist} in artist_genre_map db."
)
return res["genre"]
_cursor = _db.execute(query, params)
res = _cursor.fetchone()
if not res:
return "Not Found" # Exception suppressed
# raise RadioException(
# f"Could not locate {artist} in artist_genre_map db."
# )
return res["genre"]
except Exception as e:
logging.info("Failed to look up genre for artist: %s (%s)", artist, str(e))
traceback.print_exc()
return "Not Found"
async def load_playlist(self) -> None:
"""Load Playlist"""
async def load_playlists(self) -> None:
"""Load Playlists"""
try:
logging.info("Loading playlist...")
self.active_playlist.clear()
# db_query = 'SELECT distinct(artist || " - " || song) AS artistdashsong, id, artist, song, album, genre, file_path, duration FROM tracks\
# GROUP BY artistdashsong ORDER BY RANDOM()'
logging.info("Loading playlists...")
if isinstance(self.active_playlist, dict):
self.active_playlist.clear()
"""
LIMITED GENRES
"""
db_query: str = (
'SELECT distinct(LOWER(TRIM(artist)) || " - " || LOWER(TRIM(song))), (TRIM(artist) || " - " || TRIM(song))'
"AS artistdashsong, id, artist, song, album, file_path, duration FROM tracks GROUP BY artistdashsong ORDER BY RANDOM()"
)
"""
LIMITED TO ONE/SMALL SUBSET OF GENRES
"""
# db_query = 'SELECT distinct(artist || " - " || song) AS artistdashsong, id, artist, song, album, genre, file_path, duration FROM tracks\
# WHERE (artist LIKE "%winds of plague%" OR artist LIKE "%acacia st%" OR artist LIKE "%suicide si%" OR artist LIKE "%in dying%") AND (NOT song LIKE "%(live%") ORDER BY RANDOM()' #ORDER BY artist DESC, album ASC, song ASC'
"""
LIMITED TO ONE/SOME ARTISTS...
"""
# db_query = 'SELECT distinct(artist || " - " || song) AS artistdashsong, id, artist, song, album, genre, file_path, duration FROM tracks\
# WHERE (artist LIKE "%rise against%" OR artist LIKE "%i prevail%" OR artist LIKE "%volumes%" OR artist LIKE "%movements%" OR artist LIKE "%woe%" OR artist LIKE "%smittyztop%" OR artist LIKE "%chunk! no,%" OR artist LIKE "%fame on fire%" OR artist LIKE "%our last night%" OR artist LIKE "%animal in me%") AND (NOT song LIKE "%%stripped%%" AND NOT song LIKE "%(2022)%" AND NOT song LIKE "%(live%%" AND NOT song LIKE "%%acoustic%%" AND NOT song LIKE "%%instrumental%%" AND NOT song LIKE "%%remix%%" AND NOT song LIKE "%%reimagined%%" AND NOT song LIKE "%%alternative%%" AND NOT song LIKE "%%unzipped%%") GROUP BY artistdashsong ORDER BY RANDOM()'# ORDER BY album ASC, id ASC'
# db_query = 'SELECT distinct(artist || " - " || song) AS artistdashsong, id, artist, song, album, genre, file_path, duration FROM tracks\
# WHERE (artist LIKE "%sullivan king%" OR artist LIKE "%kayzo%" OR artist LIKE "%adventure club%") AND (NOT song LIKE "%%stripped%%" AND NOT song LIKE "%(2022)%" AND NOT song LIKE "%(live%%" AND NOT song LIKE "%%acoustic%%" AND NOT song LIKE "%%instrumental%%" AND NOT song LIKE "%%remix%%" AND NOT song LIKE "%%reimagined%%" AND NOT song LIKE "%%alternative%%" AND NOT song LIKE "%%unzipped%%") GROUP BY artistdashsong ORDER BY RANDOM()'# ORDER BY album ASC, id ASC'
# db_query = 'SELECT distinct(artist || " - " || song) AS artistdashsong, id, artist, song, album, genre, file_path, duration FROM tracks\
# WHERE (artist LIKE "%akira the don%") AND (NOT song LIKE "%%stripped%%" AND NOT song LIKE "%(2022)%" AND NOT song LIKE "%(live%%" AND NOT song LIKE "%%acoustic%%" AND NOT song LIKE "%%instrumental%%" AND NOT song LIKE "%%remix%%" AND NOT song LIKE "%%reimagined%%" AND NOT song LIKE "%%alternative%%" AND NOT song LIKE "%%unzipped%%") GROUP BY artistdashsong ORDER BY RANDOM()'# ORDER BY album ASC, id ASC'
async with sqlite3.connect(
f"file:{self.active_playlist_path}?mode=ro", uri=True, timeout=15
) as db_conn:
db_conn.row_factory = sqlite3.Row
async with await db_conn.execute(db_query) as db_cursor:
results: list[sqlite3.Row] = await db_cursor.fetchall()
self.active_playlist = [
{
"uuid": str(uuid().hex),
"id": r["id"],
"artist": double_space.sub(" ", r["artist"]).strip(),
"song": double_space.sub(" ", r["song"]).strip(),
"album": double_space.sub(" ", r["album"]).strip(),
"genre": await self.get_genre(
double_space.sub(" ", r["artist"]).strip()
),
"artistsong": double_space.sub(
" ", r["artistdashsong"]
).strip(),
"file_path": r["file_path"],
"duration": r["duration"],
}
for r in results
]
logging.info(
"Populated active playlists with %s items",
len(self.active_playlist),
for playlist in self.playlists:
playlist_redis_key: str = f"playlist:{playlist}"
_playlist = await self.redis_client.json().get(playlist_redis_key) # type: ignore
if playlist not in self.active_playlist.keys():
self.active_playlist[playlist] = []
random.shuffle(_playlist)
self.active_playlist[playlist] = [
{
"uuid": str(uuid().hex),
"id": r["id"],
"artist": double_space.sub(" ", r["artist"]).strip(),
"song": double_space.sub(" ", r["song"]).strip(),
"album": double_space.sub(" ", r["album"]).strip(),
"genre": r["genre"] if r["genre"] else "Not Found",
"artistsong": double_space.sub(
" ", r["artistdashsong"]
).strip(),
"file_path": r["file_path"],
"duration": r["duration"],
} for r in _playlist
if r not in self.active_playlist[playlist]
]
logging.info(
"Populated playlist: %s with %s items",
playlist, len(self.active_playlist[playlist]),
)
"""Dedupe"""
logging.info("Removing duplicate tracks...")
dedupe_processed = []
for item in self.active_playlist[playlist]:
artistsongabc: str = non_alnum.sub("", item.get("artistsong", ""))
if not artistsongabc:
logging.info("Missing artistsong: %s", item)
continue
if artistsongabc in dedupe_processed:
self.active_playlist[playlist].remove(item)
dedupe_processed.append(artistsongabc)
logging.info(
"Duplicates for playlist: %s removed. New playlist size: %s",
playlist, len(self.active_playlist[playlist]),
)
if playlist == 'main' and self.playback_genres:
new_playlist: list[dict] = []
logging.info("Limiting playback genres")
for item in self.active_playlist[playlist]:
item_genres = item.get("genre", "").strip().lower()
# Check if any genre matches and item isn't already in new_playlist
if any(genre.strip().lower() in item_genres for genre in self.playback_genres):
if item not in new_playlist:
new_playlist.append(item)
self.active_playlist[playlist] = new_playlist
logging.info(
"%s items for playlist: %s remain for playback after filtering",
playlist, len(self.active_playlist[playlist]),
)
"""Loading Complete"""
logging.info(f"Skipping: {playlist}")
await self._ls_skip(playlist) # Request skip from LS to bring streams current
self.playlists_loaded = True
except Exception as e:
logging.info("Playlist load failed: %s", str(e))
traceback.print_exc()
async def cache_album_art(self, track_id: int, file_path: str) -> None:
def cache_album_art(self, track_id: int, file_path: str) -> None:
"""
Cache Album Art to SQLite DB
Args:
@@ -305,19 +473,27 @@ class RadioUtil:
)
tagger = music_tag.load_file(file_path)
album_art = tagger["artwork"].first.data
async with sqlite3.connect(self.album_art_db_path, timeout=2) as db_conn:
async with await db_conn.execute(
with sqlite3.connect(self.album_art_db_path, timeout=2) as db_conn:
db_cursor = db_conn.execute(
"INSERT OR IGNORE INTO album_art (track_id, album_art) VALUES(?, ?)",
(
track_id,
album_art,
),
) as db_cursor:
await db_conn.commit()
except:
)
if isinstance(db_cursor.lastrowid, int):
db_conn.commit()
else:
logging.debug(
"No row inserted for track_id: %s w/ file_path: %s",
track_id,
file_path,
)
except Exception as e:
logging.debug("cache_album_art Exception: %s", str(e))
traceback.print_exc()
async def get_album_art(self, track_id: int) -> Optional[bytes]:
def get_album_art(self, track_id: int) -> Optional[bytes]:
"""
Get Album Art
Args:
@@ -326,23 +502,24 @@ class RadioUtil:
Optional[bytes]
"""
try:
async with sqlite3.connect(self.album_art_db_path, timeout=2) as db_conn:
with sqlite3.connect(self.album_art_db_path, timeout=2) as db_conn:
db_conn.row_factory = sqlite3.Row
query: str = "SELECT album_art FROM album_art WHERE track_id = ?"
query_params: tuple[int] = (track_id,)
async with await db_conn.execute(query, query_params) as db_cursor:
result: Optional[Union[sqlite3.Row, bool]] = (
await db_cursor.fetchone()
)
if not result or not isinstance(result, sqlite3.Row):
return None
return result["album_art"]
except:
db_cursor = db_conn.execute(query, query_params)
result: Optional[Union[sqlite3.Row, bool]] = db_cursor.fetchone()
if not result or not isinstance(result, sqlite3.Row):
return None
return result["album_art"]
except Exception as e:
logging.debug("get_album_art Exception: %s", str(e))
traceback.print_exc()
return None
def get_queue_item_by_uuid(self, _uuid: str) -> Optional[tuple[int, dict]]:
def get_queue_item_by_uuid(self,
_uuid: str,
station: str = "main") -> Optional[tuple[int, dict]]:
"""
Get queue item by UUID
Args:
@@ -350,29 +527,31 @@ class RadioUtil:
Returns:
Optional[tuple[int, dict]]
"""
for x, item in enumerate(self.active_playlist):
for x, item in enumerate(self.active_playlist[station]):
if item.get("uuid") == _uuid:
return (x, item)
return None
async def _ls_skip(self) -> bool:
async def _ls_skip(self, station: str = "main") -> bool:
"""
Ask LiquidSoap server to skip to the next track
Args:
None
station (str): default "main"
Returns:
bool
"""
try:
async with ClientSession() as session:
async with session.get(
f"{self.ls_uri}/next", timeout=ClientTimeout(connect=2, sock_read=2)
async with session.post(
f"{self.ls_uri}/next",
data=station,
timeout=ClientTimeout(connect=2, sock_read=2)
) as request:
request.raise_for_status()
text: Optional[str] = await request.text()
return text == "OK"
return isinstance(text, str) and text.startswith("OK")
except Exception as e:
logging.debug("Skip failed: %s", str(e))
logging.critical("Skip failed: %s", str(e))
return False # failsafe
@@ -392,15 +571,20 @@ class RadioUtil:
return None
return response
async def webhook_song_change(self, track: dict) -> None:
async def webhook_song_change(self, track: dict, station: str = "main") -> None:
"""
Handles Song Change Outbounds (Webhooks)
Args:
track (dict)
station (str): default "main"
Returns:
None
"""
try:
"""TEMP - ONLY MAIN"""
if not station == "main":
return
return # Temp disable global
# First, send track info
"""
TODO:
@@ -416,7 +600,7 @@ class RadioUtil:
"username": "serious.FM",
"embeds": [
{
"title": "Now Playing",
"title": f"Now Playing on {station.title()}",
"description": f"## {track['song']}\nby\n## {track['artist']}",
"color": 0x30C56F,
"thumbnail": {
@@ -448,7 +632,9 @@ class RadioUtil:
{
"name": "Album",
"value": (
track["album"] if track["album"] else "Unknown"
track["album"]
if "album" in track.keys()
else "Unknown"
),
},
],
@@ -456,48 +642,60 @@ class RadioUtil:
],
}
sfm_hook: str = self.webhooks["sfm"].get("hook")
async with ClientSession() as session:
async with await session.post(
sfm_hook,
json=hook_data,
timeout=ClientTimeout(connect=5, sock_read=5),
headers={
"content-type": "application/json; charset=utf-8",
},
) as request:
request.raise_for_status()
now: float = time.time()
_sfm: dict = self.webhooks["sfm"]
if _sfm:
sfm_hook: str = _sfm.get("hook", "")
sfm_hook_lastRun: Optional[float] = _sfm.get("lastRun", 0.0)
# Next, AI feedback
if sfm_hook_lastRun and ((now - sfm_hook_lastRun) < 5):
logging.info("SFM Webhook: Throttled!")
return
async with ClientSession() as session:
async with await session.post(
sfm_hook,
json=hook_data,
timeout=ClientTimeout(connect=5, sock_read=5),
headers={
"content-type": "application/json; charset=utf-8",
},
) as request:
request.raise_for_status()
ai_response: Optional[str] = await self.get_ai_song_info(
track["artist"], track["song"]
)
if not ai_response:
return
# Next, AI feedback (for main stream only)
"""
TEMP. DISABLED
"""
hook_data = {
"username": "GPT",
"embeds": [
{
"title": "AI Feedback",
"color": 0x35D0FF,
"description": ai_response.strip(),
}
],
}
# if station == "main":
# ai_response: Optional[str] = await self.get_ai_song_info(
# track["artist"], track["song"]
# )
# if not ai_response:
# return
ai_hook: str = self.webhooks["gpt"].get("hook")
async with ClientSession() as session:
async with await session.post(
ai_hook,
json=hook_data,
timeout=ClientTimeout(connect=5, sock_read=5),
headers={
"content-type": "application/json; charset=utf-8",
},
) as request:
request.raise_for_status()
# hook_data = {
# "username": "GPT",
# "embeds": [
# {
# "title": "AI Feedback",
# "color": 0x35D0FF,
# "description": ai_response.strip(),
# }
# ],
# }
# ai_hook: str = self.webhooks["gpt"].get("hook")
# async with ClientSession() as session:
# async with await session.post(
# ai_hook,
# json=hook_data,
# timeout=ClientTimeout(connect=5, sock_read=5),
# headers={
# "content-type": "application/json; charset=utf-8",
# },
# ) as request:
# request.raise_for_status()
except Exception as e:
logging.info("Webhook error occurred: %s", str(e))
traceback.print_exc()

401
utils/rip_background.py Normal file
View File

@@ -0,0 +1,401 @@
import logging
import asyncio
import random
import os
import tarfile
import traceback
import uuid
import subprocess
import shutil
import re
from pathlib import Path
from urllib.parse import urlparse, unquote
import aiohttp
from datetime import datetime
from mediafile import MediaFile # type: ignore[import]
from rq import get_current_job
from utils.sr_wrapper import SRUtil
# ---------- Config ----------
ROOT_DIR = Path("/storage/music2")
MAX_RETRIES = 5
THROTTLE_MIN = 1.0
THROTTLE_MAX = 3.5
HEADERS = {
"User-Agent": (
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
"AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/116.0.5845.97 Safari/537.36"
),
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8",
"Accept-Language": "en-US,en;q=0.9",
"Connection": "keep-alive",
}
logging.basicConfig(
level=logging.DEBUG,
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
)
sr = SRUtil()
# ---------- Helpers ----------
def tag_with_mediafile(file_path: str, meta: dict):
f = MediaFile(file_path)
# --- Helper to safely set textual/number fields ---
def safe_set(attr, value, default=None, cast=None):
if value is None:
value = default
if value is not None:
if cast is not None:
setattr(f, attr, cast(value))
else:
setattr(f, attr, str(value))
# --- Basic textual metadata ---
safe_set("title", meta.get("title"), default="Unknown Title")
safe_set("artist", meta.get("artist"), default="Unknown Artist")
safe_set("albumartist", meta.get("album_artist"), default="Unknown Artist")
safe_set("album", meta.get("album"), default="Unknown Album")
safe_set("track", meta.get("track_number"), default=0, cast=int)
safe_set("disc", meta.get("disc_number"), default=0, cast=int)
safe_set("isrc", meta.get("isrc"), default="")
safe_set("bpm", meta.get("bpm"), default=0, cast=int)
# --- Release date ---
release_date_str = meta.get("release_date")
release_date_obj = None
if release_date_str:
try:
release_date_obj = datetime.fromisoformat(release_date_str).date()
except ValueError:
try:
# fallback if only year string
release_date_obj = datetime(int(release_date_str[:4]), 1, 1).date()
except Exception:
pass
if release_date_obj:
f.date = release_date_obj
# --- Save all tags ---
f.save()
def cleanup_empty_dirs(root: Path):
"""
Recursively remove any directories under root that contain no files
(empty or only empty subdirectories).
"""
for dirpath, dirnames, filenames in os.walk(root, topdown=False):
p = Path(dirpath)
has_file = any(f.is_file() for f in p.rglob("*"))
if not has_file:
try:
p.rmdir()
except Exception:
pass
def sanitize_filename(name: str) -> str:
"""Make a string safe for file/dir names."""
if not name:
return "Unknown"
name = name.replace("/", "-").replace("\\", "-")
name = re.sub(r'[<>:"|?*\x00-\x1F]', "", name)
name = name.strip().strip(".")
name = re.sub(r"\s+", " ", name)
return name[:180] or "Unknown"
def ensure_unique_path(p: Path) -> Path:
"""
Ensure the given file or directory path is unique *within its parent folder*.
Only appends (2), (3)... if a real conflict exists in that folder.
"""
parent = p.parent
stem, suffix = p.stem, p.suffix
existing = {f.name for f in parent.glob(f"*{suffix}") if f.is_file()}
candidate = f"{stem}{suffix}"
if candidate not in existing:
return parent / candidate
counter = 2
while True:
candidate = f"{stem} ({counter}){suffix}"
if candidate not in existing:
return parent / candidate
counter += 1
# ---------- Job ----------
def bulk_download(track_list: list, quality: str = "FLAC"):
"""
RQ job:
- fetches stream URLs
- downloads with retries + throttling
- uses SR metadata to name/organize files
- creates ONE tarball for all tracks
- returns [tarball_path]
"""
job = get_current_job()
job_id = job.id if job else uuid.uuid4().hex
staging_root = ROOT_DIR / job_id
if job:
try:
job.meta["track_ids"] = [str(t) for t in (track_list or [])]
job.meta["tracks"] = []
job.meta["progress"] = 0
job.meta["tarball"] = None
job.meta["status"] = "Started"
job.save_meta()
except Exception as e:
logging.warning("Failed to init job.meta: %s", e)
async def process_tracks():
per_track_meta = []
all_final_files = []
all_artists = set()
(ROOT_DIR / "completed").mkdir(parents=True, exist_ok=True)
async with aiohttp.ClientSession(headers=HEADERS) as session:
total = len(track_list or [])
logging.critical("Total tracks to process: %s", total)
if job:
job.meta["progress"] = 0
job.save_meta()
for i, track_id in enumerate(track_list or []):
track_info = {
"track_id": str(track_id),
"status": "Pending",
"file_path": None,
"error": None,
"attempts": 0,
}
attempt = 0
while attempt < MAX_RETRIES:
tmp_file = None
attempt += 1
track_info["attempts"] = attempt
try:
url = await sr.get_stream_url_by_track_id(track_id, quality)
if not url:
raise RuntimeError("No stream URL")
parsed = urlparse(url)
clean_path = unquote(parsed.path)
ext = Path(clean_path).suffix or ".mp3"
tmp_file = Path(f"/tmp/{uuid.uuid4().hex}{ext}")
async with session.get(url) as resp:
resp.raise_for_status()
with open(tmp_file, "wb") as f:
async for chunk in resp.content.iter_chunked(64 * 1024):
f.write(chunk)
md = await sr.get_metadata_by_track_id(track_id) or {}
logging.info("Metadata for %s: %s", track_id, md)
artist_raw = md.get("artist") or "Unknown Artist"
album_raw = md.get("album") or "Unknown Album"
title_raw = md.get("title") or f"Track {track_id}"
artist = sanitize_filename(artist_raw)
album = sanitize_filename(album_raw)
title = sanitize_filename(title_raw)
all_artists.add(artist)
artist_dir = staging_root / artist
album_dir = artist_dir / album
album_dir.mkdir(parents=True, exist_ok=True)
final_file = ensure_unique_path(album_dir / f"{title}{ext}")
tag_with_mediafile(str(tmp_file), md)
tmp_file.rename(final_file)
tmp_file = None
track_info["status"] = "Success"
track_info["file_path"] = str(final_file)
track_info["error"] = None
all_final_files.append(final_file)
if job:
job.meta["progress"] = int(((i + 1) / total) * 100)
job.save_meta()
break
except aiohttp.ClientResponseError as e:
if e.status == 429:
wait_time = min(60, 2**attempt) # exponential up to 60s
logging.warning(
"Rate limited (429). Sleeping %s seconds", wait_time
)
await asyncio.sleep(wait_time)
else:
await asyncio.sleep(
random.uniform(THROTTLE_MIN, THROTTLE_MAX)
)
except Exception as e:
logging.error(
"Track %s attempt %s failed: %s", track_id, attempt, e
)
traceback.print_exc()
track_info["error"] = str(e)
if attempt >= MAX_RETRIES:
track_info["status"] = "Failed"
await asyncio.sleep(random.uniform(THROTTLE_MIN, THROTTLE_MAX))
finally:
try:
if tmp_file and tmp_file.exists():
tmp_file.unlink()
except Exception:
pass
per_track_meta.append(track_info)
if job:
try:
job.meta["tracks"] = per_track_meta
job.save_meta()
except Exception as e:
logging.warning(
"Failed to update job.meta after track %s: %s", track_id, e
)
await asyncio.sleep(random.uniform(THROTTLE_MIN, THROTTLE_MAX))
if not all_final_files:
if job:
try:
job.meta["tarball"] = None
job.meta["status"] = "Failed"
job.save_meta()
except Exception:
pass
return []
artist_counts: dict[str, int] = {}
for t in per_track_meta:
if t["status"] == "Success" and t.get("file_path"):
try:
artist = Path(t["file_path"]).relative_to(staging_root).parts[0]
except Exception:
artist = "Unknown Artist"
artist_counts[artist] = artist_counts.get(artist, 0) + 1
if artist_counts:
top_artist = sorted(artist_counts.items(), key=lambda kv: (-kv[1], kv[0]))[
0
][0]
else:
top_artist = "Unknown Artist"
combined_artist = sanitize_filename(top_artist)
staged_tarball = staging_root / f"{combined_artist}.tar.gz"
# Ensure uniqueness (Windows-style padding) within the parent folder
counter = 1
base_name = staged_tarball.stem
while staged_tarball.exists():
counter += 1
staged_tarball = staging_root / f"{base_name} ({counter}).tar.gz"
final_tarball = ROOT_DIR / "completed" / quality / staged_tarball.name
final_tarball.parent.mkdir(parents=True, exist_ok=True)
if job:
try:
job.meta["status"] = "Compressing"
job.save_meta()
except Exception:
pass
logging.info("Creating tarball: %s", staged_tarball)
def _create_tar_sync():
try:
subprocess.run(
[
"tar",
"-I",
"pigz -9",
"-cf",
str(staged_tarball),
"-C",
str(staging_root),
]
+ [str(f.relative_to(staging_root)) for f in all_final_files],
check=True,
)
for f in all_final_files:
try:
os.remove(f)
except Exception:
pass
except FileNotFoundError:
logging.warning("pigz not available, falling back to tarfile (slower).")
with tarfile.open(staged_tarball, "w:gz") as tar:
for f in all_final_files:
try:
arcname = f.relative_to(staging_root)
except ValueError:
arcname = f.name
tar.add(f, arcname=str(arcname))
try:
os.remove(f)
except Exception:
pass
await asyncio.to_thread(_create_tar_sync)
if not staged_tarball.exists():
logging.error("Tarball was not created: %s", staged_tarball)
if job:
try:
job.meta["status"] = "compress_failed"
job.save_meta()
except Exception:
pass
return []
logging.critical("Tarball created: %s", staged_tarball)
try:
staged_tarball.rename(final_tarball)
except Exception:
shutil.move(str(staged_tarball), str(final_tarball))
logging.critical("Tarball finalized: %s", final_tarball)
await asyncio.to_thread(shutil.rmtree, staging_root, ignore_errors=True)
if job:
job.meta["tarball"] = str(final_tarball)
job.meta["progress"] = 100
job.meta["status"] = "Completed"
job.save_meta()
return [str(final_tarball)]
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
return loop.run_until_complete(process_tracks())
except Exception as e:
if job:
job.meta["status"] = "Failed"
job.save_meta()
logging.critical("Exception: %s", str(e))
finally:
loop.close()

401
utils/sr_wrapper.py Normal file
View File

@@ -0,0 +1,401 @@
from typing import Optional, Any
from uuid import uuid4
from urllib.parse import urlparse
import hashlib
import logging
import random
import asyncio
import os
import aiohttp
import time
from streamrip.client import TidalClient # type: ignore
from streamrip.config import Config as StreamripConfig # type: ignore
from dotenv import load_dotenv
load_dotenv()
class SRUtil:
"""
StreamRip API Utility Class
"""
def __init__(self) -> None:
"""Initialize StreamRip utility."""
self.streamrip_config = StreamripConfig.defaults()
self.streamrip_config.session.tidal.user_id = os.getenv("tidal_user_id", "")
self.streamrip_config.session.tidal.access_token = os.getenv(
"tidal_access_token", ""
)
self.streamrip_config.session.tidal.refresh_token = os.getenv(
"tidal_refresh_token", ""
)
self.streamrip_config.session.tidal.token_expiry = os.getenv(
"tidal_token_expiry", ""
)
self.streamrip_config.session.tidal.country_code = os.getenv(
"tidal_country_code", ""
)
self.streamrip_config.session.tidal.quality = int(
os.getenv("tidal_default_quality", 2)
)
self.streamrip_config.session.conversion.enabled = False
self.streamrip_config.session.downloads.folder = os.getenv(
"tidal_download_folder", ""
)
self.streamrip_config
self.streamrip_client = TidalClient(self.streamrip_config)
self.MAX_CONCURRENT_METADATA_REQUESTS = 2
self.METADATA_RATE_LIMIT = 1.25
self.METADATA_SEMAPHORE = asyncio.Semaphore(self.MAX_CONCURRENT_METADATA_REQUESTS)
self.LAST_METADATA_REQUEST = 0
self.MAX_METADATA_RETRIES = 5
self.METADATA_ALBUM_CACHE: dict[str, dict] = {}
self.RETRY_DELAY = 1.0 # seconds between retries
async def rate_limited_request(self, func, *args, **kwargs):
async with self.METADATA_SEMAPHORE:
now = time.time()
elapsed = now - self.LAST_METADATA_REQUEST
if elapsed < self.METADATA_RATE_LIMIT:
await asyncio.sleep(self.METADATA_RATE_LIMIT - elapsed)
result = await func(*args, **kwargs)
self.last_request_time = time.time()
return result
def dedupe_by_key(self, key: str, entries: list[dict]) -> list[dict]:
deduped = {}
for entry in entries:
norm = entry[key].strip().lower()
if norm not in deduped:
deduped[norm] = entry
return list(deduped.values())
def format_duration(self, seconds):
if not seconds:
return None
m, s = divmod(seconds, 60)
return f"{m}:{s:02}"
def combine_album_track_metadata(
self, album_json: dict | None, track_json: dict
) -> dict:
"""
Combine album-level and track-level metadata into a unified tag dictionary.
Track-level metadata overrides album-level where relevant.
"""
album_json = album_json or {}
# Album-level
combined = {
"album": album_json.get("title"),
"album_artist": album_json.get("artist", {}).get("name"),
"release_date": album_json.get("releaseDate"),
"album_type": album_json.get("type"),
"total_tracks": album_json.get("numberOfTracks"),
"upc": album_json.get("upc"),
"album_copyright": album_json.get("copyright"),
"album_cover_id": album_json.get("cover"),
"album_cover_url": f"https://resources.tidal.com/images/{album_json.get('cover')}/1280x1280.jpg"
if album_json.get("cover")
else None,
}
# Track-level (overrides or adds to album info)
combined.update(
{
"title": track_json.get("title"),
"artist": track_json.get("artist", {}).get("name"),
"artists": [a.get("name") for a in track_json.get("artists", [])],
"track_number": track_json.get("trackNumber"),
"disc_number": track_json.get("volumeNumber"),
"duration": track_json.get("duration"),
"isrc": track_json.get("isrc"),
"bpm": track_json.get("bpm"),
"explicit": track_json.get("explicit"),
"replaygain": track_json.get("replayGain"),
"peak": track_json.get("peak"),
"lyrics": track_json.get("lyrics"),
"track_copyright": track_json.get("copyright"),
"cover_id": track_json.get("album", {}).get("cover") or album_json.get("cover"),
"cover_url": (
f"https://resources.tidal.com/images/{track_json.get('album', {}).get('cover', album_json.get('cover'))}/1280x1280.jpg"
if (track_json.get("album", {}).get("cover") or album_json.get("cover"))
else None
),
}
)
return combined
def combine_album_with_all_tracks(
self, album_json: dict[str, Any]
) -> list[dict[str, Any]]:
"""Return a list of combined metadata dicts for all tracks in an album JSON."""
return [
self.combine_album_track_metadata(album_json, t)
for t in album_json.get("tracks", [])
]
async def get_artists_by_name(self, artist_name: str) -> Optional[list]:
"""Get artist(s) by name.
Args:
artist_name (str): The name of the artist.
Returns:
Optional[dict]: The artist details or None if not found.
"""
try:
await self.streamrip_client.login()
except Exception as e:
logging.info("Login Exception: %s", str(e))
pass
artists_out: list[dict] = []
try:
artists = await self.streamrip_client.search(
media_type="artist", query=artist_name
)
except AttributeError:
await self.streamrip_client.login()
artists = await self.streamrip_client.search(
media_type="artist", query=artist_name
)
logging.critical("Artists output: %s", artists)
artists = artists[0].get("items", [])
if not artists:
logging.warning("No artist found for name: %s", artist_name)
return None
artists_out = [
{
"artist": res["name"],
"id": res["id"],
}
for res in artists
if "name" in res and "id" in res
]
artists_out = self.dedupe_by_key("artist", artists_out) # Remove duplicates
return artists_out
async def get_albums_by_artist_id(self, artist_id: int) -> Optional[list | dict]:
"""Get albums by artist ID
Args:
artist_id (int): The ID of the artist.
Returns:
Optional[list[dict]]: List of albums or None if not found.
"""
artist_id_str: str = str(artist_id)
albums_out: list[dict] = []
try:
await self.streamrip_client.login()
metadata = await self.streamrip_client.get_metadata(
item_id=artist_id_str, media_type="artist"
)
except AttributeError:
await self.streamrip_client.login()
metadata = await self.streamrip_client.get_metadata(
item_id=artist_id_str, media_type="artist"
)
if not metadata:
logging.warning("No metadata found for artist ID: %s", artist_id)
return None
albums = self.dedupe_by_key("title", metadata.get("albums", []))
albums_out = [
{
"artist": ", ".join(artist["name"] for artist in album["artists"]),
"album": album["title"],
"id": album["id"],
"release_date": album.get("releaseDate", "Unknown"),
}
for album in albums
if "title" in album and "id" in album and "artists" in album
]
logging.debug("Retrieved albums: %s", albums_out)
return albums_out
async def get_tracks_by_album_id(
self, album_id: int, quality: str = "FLAC"
) -> Optional[list | dict]:
"""Get tracks by album ID
Args:
album_id (int): The ID of the album.
Returns:
Optional[list[dict]]: List of tracks or None if not found.
"""
album_id_str = str(album_id)
await self.streamrip_client.login()
metadata = await self.streamrip_client.get_metadata(
item_id=album_id_str, media_type="album"
)
if not metadata:
logging.warning("No metadata found for album ID: %s", album_id)
return None
track_list = metadata.get("tracks", [])
tracks_out: list[dict] = [
{
"id": track.get("id"),
"artist": track.get("artist").get("name"),
"title": track.get("title"),
"duration": self.format_duration(track.get("duration", 0)),
"version": track.get("version"),
"audioQuality": track.get("audioQuality"),
}
for track in track_list
]
return tracks_out
async def get_tracks_by_artist_song(self, artist: str, song: str) -> Optional[list]:
"""Get track by artist and song name
Args:
artist (str): The name of the artist.
song (str): The name of the song.
Returns:
Optional[dict]: The track details or None if not found.
TODO: Reimplement using StreamRip
"""
return []
async def get_stream_url_by_track_id(
self, track_id: int, quality: str = "FLAC"
) -> Optional[str]:
"""Get stream URL by track ID
Args:
track_id (int): The ID of the track.
quality (str): The quality of the stream, default is "LOSSLESS". Other options: HIGH, LOW
Returns:
Optional[str]: The stream URL or None if not found.
"""
if quality not in ["FLAC", "Lossy"]:
logging.error("Invalid quality requested: %s", quality)
return None
quality_int: int = int(self.streamrip_config.session.tidal.quality)
match quality:
case "FLAC":
quality_int = 2
case "Lossy":
quality_int = 1
track_id_str: str = str(track_id)
await self.streamrip_client.login()
try:
logging.critical("Using quality_int: %s", quality_int)
track = await self.streamrip_client.get_downloadable(
track_id=track_id_str, quality=quality_int
)
except AttributeError:
await self.streamrip_client.login()
track = await self.streamrip_client.get_downloadable(
track_id=track_id_str, quality=quality_int
)
if not track:
logging.warning("No track found for ID: %s", track_id)
return None
stream_url = track.url
if not stream_url:
logging.warning("No stream URL found for track ID: %s", track_id)
return None
return stream_url
async def get_metadata_by_track_id(self, track_id: int) -> Optional[dict]:
"""
Fetch track + album metadata with retries, caching album data.
Returns combined metadata dict or None after exhausting retries.
"""
for attempt in range(1, self.MAX_METADATA_RETRIES + 1):
try:
await self.streamrip_client.login()
# Track metadata
metadata = await self.rate_limited_request(
self.streamrip_client.get_metadata, str(track_id), "track"
)
album_id = metadata.get("album", {}).get("id")
album_metadata = None
if album_id:
# Check cache first
if album_id in self.METADATA_ALBUM_CACHE:
album_metadata = self.METADATA_ALBUM_CACHE[album_id]
else:
album_metadata = await self.rate_limited_request(
self.streamrip_client.get_metadata, album_id, "album"
)
if not album_metadata:
return None
self.METADATA_ALBUM_CACHE[album_id] = album_metadata
# Combine track + album metadata
if not album_metadata:
return None
combined_metadata: dict = self.combine_album_track_metadata(
album_metadata, metadata
)
logging.info(
"Combined metadata for track ID %s (attempt %d): %s",
track_id,
attempt,
combined_metadata,
)
return combined_metadata
except Exception as e:
# Exponential backoff with jitter for 429 or other errors
delay = self.RETRY_DELAY * (2 ** (attempt - 1)) + random.uniform(0, 0.5)
logging.warning(
"Metadata fetch failed for track %s (attempt %d/%d): %s. Retrying in %.2fs",
track_id,
attempt,
self.MAX_METADATA_RETRIES,
str(e),
delay,
)
if attempt < self.MAX_METADATA_RETRIES:
await asyncio.sleep(delay)
else:
logging.error(
"Metadata fetch failed permanently for track %s after %d attempts",
track_id,
self.MAX_METADATA_RETRIES,
)
return None
async def download(self, track_id: int, quality: str = "LOSSLESS") -> bool | str:
"""Download track
Args:
track_id (int)
quality (str): The quality of the stream, default is "LOSSLESS". Other options: HIGH, LOW
Returns:
bool
"""
try:
await self.streamrip_client.login()
track_url = await self.get_stream_url_by_track_id(track_id)
if not track_url:
return False
parsed_url = urlparse(track_url)
parsed_url_filename = os.path.basename(parsed_url.path)
parsed_url_ext = os.path.splitext(parsed_url_filename)[1]
unique = hashlib.sha256(uuid4().bytes).hexdigest()[:16]
dl_folder_path = (
f"{self.streamrip_config.session.downloads.folder}/{unique}"
)
dl_path = f"{dl_folder_path}/{track_id}.{parsed_url_ext}"
async with aiohttp.ClientSession() as session:
async with session.get(
track_url, headers={}, timeout=aiohttp.ClientTimeout(total=60)
) as resp:
resp.raise_for_status()
with open(dl_path, "wb") as f:
async for chunk in resp.content.iter_chunked(1024 * 64):
f.write(chunk)
return dl_path
except Exception as e:
logging.critical("Error: %s", str(e))
return False

35
utils/test.conf Normal file
View File

@@ -0,0 +1,35 @@
# -----------------------
# /m/m2/ PHP handler
location ~ ^/m/m2/(.+\.php)$ {
alias /storage/music2/completed/;
include fastcgi_params;
fastcgi_pass unix:/run/php/php8.2-fpm.sock;
fastcgi_param SCRIPT_FILENAME /storage/music2/completed/$1;
fastcgi_param DOCUMENT_ROOT /storage/music2/completed;
fastcgi_param SCRIPT_NAME /m/m2/$1;
}
# /m/m2/ static files
location /m/m2/ {
alias /storage/music2/completed/;
index index.php;
try_files $uri $uri/ /index.php$is_args$args;
}
# -----------------------
# /m/ PHP handler
location ~ ^/m/(.+\.php)$ {
root /var/www/codey.lol/new/public;
include fastcgi_params;
fastcgi_pass unix:/run/php/php8.2-fpm.sock;
fastcgi_param SCRIPT_FILENAME $document_root/$1;
fastcgi_param DOCUMENT_ROOT $document_root;
fastcgi_param SCRIPT_NAME /m/$1;
}
# /m/ static files
location /m/ {
root /var/www/codey.lol/new/public;
index index.php;
try_files $uri $uri/ /m/index.php$is_args$args;
}