import os import logging import io from typing import Optional import aiosqlite as sqlite3 from PIL import Image class MemeUtil: """ Meme Utils """ def __init__(self, constants) -> None: self.constants = constants self.meme_db_path = os.path.join("/usr/local/share", "sqlite_dbs", "meme.db") def is_png(self, buffer: bytes | io.BytesIO) -> bool: """ Check if image (in-memory buffer, or bytes) is a PNG Args: buffer (bytes|io.BytesIO) Returns: bool """ # Accepts either bytes or a BytesIO-like object if isinstance(buffer, io.BytesIO): if hasattr(buffer, "read") and hasattr(buffer, "seek"): pos = buffer.tell() buffer.seek(0) signature = buffer.read(8) buffer.seek(pos) else: signature = buffer[:8] return signature == b"\x89PNG\r\n\x1a\n" def convert_to_png(self, in_buffer: io.BytesIO) -> bytes: """ Convert an in-memory buffer to PNG Args: in_buffer (io.BytesIO) Returns: bytes """ in_buffer.seek(0) with Image.open(in_buffer) as im: if im.format == "PNG": raise ValueError("Already a PNG") out_buffer = io.BytesIO() im.save(out_buffer, format="PNG") out_buffer.seek(0) return out_buffer.read() async def get_meme_by_id(self, meme_id: int) -> Optional[bytes]: """ Get meme by id Args: meme_id (int) Returns: Optional[bytes] """ ret_image: Optional[bytes] = None buffer: Optional[io.BytesIO] = None async with sqlite3.connect(self.meme_db_path, timeout=5) as db_conn: db_conn.row_factory = sqlite3.Row query: str = "SELECT image FROM memes WHERE id = ? LIMIT 1" async with await db_conn.execute(query, (meme_id,)) as db_cursor: result = await db_cursor.fetchone() if not result: return None buffer = io.BytesIO(result["image"]) is_png = self.is_png(buffer) if not is_png: logging.debug("Converting %s, not detected as PNG", meme_id) ret_image = self.convert_to_png(buffer) else: ret_image = result["image"] return ret_image async def list_memes(self, page: int) -> Optional[list]: """ List memes (paginated) Args: page (id) Returns: list """ out_result: list = [] async with sqlite3.connect(self.meme_db_path, timeout=5) as db_conn: db_conn.row_factory = sqlite3.Row rows_per_page: int = 10 offset: int = (page - 1) * rows_per_page query: str = "SELECT id, timestamp FROM memes ORDER BY timestamp DESC LIMIT 10 OFFSET ?" async with await db_conn.execute(query, (offset,)) as db_cursor: results = await db_cursor.fetchall() for result in results: result_id = result["id"] result_timestamp = result["timestamp"] out_result.append( { "id": result_id, "timestamp": result_timestamp, } ) return out_result