cleanup/minor

This commit is contained in:
2025-03-04 08:11:55 -05:00
parent a023868671
commit f29837280a
6 changed files with 103 additions and 46 deletions

View File

@ -1,7 +1,11 @@
import logging
import time
from typing import Optional
from fastapi import FastAPI
import os
from typing import Optional, Annotated
from fastapi import (
FastAPI, Request, UploadFile,
Response, HTTPException, Form
)
from fastapi.responses import JSONResponse
import redis.asyncio as redis
from lyric_search.sources import private, cache as LyricsCache, redis_cache
@ -19,17 +23,52 @@ class Misc(FastAPI):
self.redis_cache = redis_cache.RedisCache()
self.redis_client = redis.Redis(password=private.REDIS_PW)
self.radio = radio
self.activity_image: Optional[bytes] = None
self.endpoints: dict = {
"widget/redis": self.homepage_redis_widget,
"widget/sqlite": self.homepage_sqlite_widget,
"widget/lyrics": self.homepage_lyrics_widget,
"widget/radio": self.homepage_radio_widget,
"misc/get_activity_image": self.get_activity_image,
}
for endpoint, handler in self.endpoints.items():
app.add_api_route(f"/{endpoint}", handler, methods=["GET"],
include_in_schema=False)
include_in_schema=True)
app.add_api_route("/misc/upload_activity_image",
self.upload_activity_image, methods=["POST"])
async def upload_activity_image(self,
image: UploadFile,
key: Annotated[str, Form()], request: Request) -> Response:
if not key or not isinstance(key, str)\
or not self.util.check_key(path=request.url.path, req_type=2, key=key):
raise HTTPException(status_code=403, detail="Unauthorized")
if not image:
return JSONResponse(status_code=500, content={
'err': True,
'errorText': 'Invalid request',
})
self.activity_image = await image.read()
return JSONResponse(content={
'success': True,
})
async def get_activity_image(self, request: Request) -> Response:
if isinstance(self.activity_image, bytes):
return Response(content=self.activity_image,
media_type="image/png")
# Fallback
fallback_path = os.path.join("/var/www/codey.lol/public",
"images", "plex_placeholder.png")
with open(fallback_path, 'rb') as f:
return Response(content=f.read(),
media_type="image/png")
async def get_radio_np(self) -> str:
"""
Get radio now playing