api/utils/meme_util.py
2025-05-20 11:14:08 -04:00

151 lines
5.2 KiB
Python

import os
import logging
import io
import math
from typing import Optional
import aiosqlite as sqlite3
from PIL import Image
class MemeUtil:
"""
Meme Utils
"""
def __init__(self, constants) -> None:
self.constants = constants
self.meme_db_path = os.path.join("/usr/local/share", "sqlite_dbs", "meme.db")
def is_png(self, buffer: bytes | io.BytesIO) -> bool:
"""
Check if image (in-memory buffer, or bytes) is a PNG
Args:
buffer (bytes|io.BytesIO)
Returns:
bool
"""
# Accepts either bytes or a BytesIO-like object
if isinstance(buffer, io.BytesIO):
if hasattr(buffer, "read") and hasattr(buffer, "seek"):
pos = buffer.tell()
buffer.seek(0)
signature = buffer.read(8)
buffer.seek(pos)
else:
signature = buffer[:8]
return signature == b"\x89PNG\r\n\x1a\n"
def convert_to_png(self, in_buffer: io.BytesIO) -> bytes:
"""
Convert an in-memory buffer to PNG
Args:
in_buffer (io.BytesIO)
Returns:
bytes
"""
in_buffer.seek(0)
with Image.open(in_buffer) as im:
if im.format == "PNG":
raise ValueError("Already a PNG")
out_buffer = io.BytesIO()
im.save(out_buffer, format="PNG")
out_buffer.seek(0)
return out_buffer.read()
async def get_meme_by_id(self, meme_id: int) -> Optional[bytes]:
"""
Get meme by id
Args:
meme_id (int)
Returns:
Optional[bytes]
"""
ret_image: Optional[bytes] = None
buffer: Optional[io.BytesIO] = None
async with sqlite3.connect(self.meme_db_path, timeout=5) as db_conn:
db_conn.row_factory = sqlite3.Row
query: str = "SELECT image FROM memes WHERE id = ? LIMIT 1"
async with await db_conn.execute(query, (meme_id,)) as db_cursor:
result = await db_cursor.fetchone()
if not result:
return None
buffer = io.BytesIO(result["image"])
is_png = self.is_png(buffer)
if not is_png:
logging.debug("Converting %s, not detected as PNG", meme_id)
ret_image = self.convert_to_png(buffer)
else:
ret_image = result["image"]
return ret_image
async def get_random_meme(self) -> Optional[bytes]:
"""
Get random meme
Returns:
Optional[bytes]
"""
ret_image: Optional[bytes] = None
buffer: Optional[io.BytesIO] = None
async with sqlite3.connect(self.meme_db_path, timeout=5) as db_conn:
db_conn.row_factory = sqlite3.Row
query: str = "SELECT id, image FROM memes ORDER BY RANDOM() LIMIT 1"
async with await db_conn.execute(query) as db_cursor:
result = await db_cursor.fetchone()
if not result:
return None
meme_id = result["id"]
buffer = io.BytesIO(result["image"])
is_png = self.is_png(buffer)
if not is_png:
logging.debug("Converting %s, not detected as PNG", meme_id)
ret_image = self.convert_to_png(buffer)
else:
ret_image = result["image"]
return ret_image
async def list_memes(self, page: int) -> Optional[list]:
"""
List memes (paginated)
Args:
page (id)
Returns:
Optional[list]
"""
out_result: list = []
async with sqlite3.connect(self.meme_db_path, timeout=5) as db_conn:
db_conn.row_factory = sqlite3.Row
rows_per_page: int = 10
offset: int = (page - 1) * rows_per_page
query: str = "SELECT id, timestamp FROM memes ORDER BY timestamp DESC LIMIT 10 OFFSET ?"
async with await db_conn.execute(query, (offset,)) as db_cursor:
results = await db_cursor.fetchall()
for result in results:
result_id = result["id"]
result_timestamp = result["timestamp"]
out_result.append(
{
"id": result_id,
"timestamp": result_timestamp,
}
)
return out_result
async def get_page_count(self) -> Optional[int]:
"""
Get page count
Returns:
Optional[int]
"""
async with sqlite3.connect(self.meme_db_path, timeout=5) as db_conn:
db_conn.row_factory = sqlite3.Row
rows_per_page: int = 10
pages: Optional[int] = None
query: str = "SELECT count(id) AS count FROM memes"
async with await db_conn.execute(query) as db_cursor:
result = await db_cursor.fetchone()
count = result["count"]
if not isinstance(count, int):
return None
pages = math.ceil(count / rows_per_page)
return pages