import os import io from typing import Optional import aiosqlite as sqlite3 from PIL import Image class MemeUtil: """ Meme Utils """ def __init__(self, constants) -> None: self.constants = constants self.meme_db_path = os.path.join("/usr/local/share", "sqlite_dbs", "meme.db") def convert_to_png(self, in_buffer: io.BytesIO) -> bytes: in_buffer.seek(0) with Image.open(in_buffer) as im: if im.format == "PNG": raise ValueError("Already a PNG") out_buffer = io.BytesIO() im.save(out_buffer, format="PNG") out_buffer.seek(0) return out_buffer.read() async def get_meme_by_id(self, meme_id: int) -> Optional[bytes]: """ Get meme by id Args: meme_id (int) Returns: Optional[bytes] """ ret_image: Optional[bytes] = None buffer: Optional[io.BytesIO] = None async with sqlite3.connect(self.meme_db_path, timeout=5) as db_conn: db_conn.row_factory = sqlite3.Row query: str = "SELECT image FROM memes WHERE id = ? LIMIT 1" async with await db_conn.execute(query, (meme_id,)) as db_cursor: result = await db_cursor.fetchone() if not result: return None buffer = io.BytesIO(result["image"]) with Image.open(buffer) as im: if im.format != "PNG": ret_image = self.convert_to_png(buffer) else: ret_image = result["image"] return ret_image async def list_memes(self, page: int) -> Optional[list]: """ List memes (paginated) Args: page (id) Returns: list """ out_result: list = [] async with sqlite3.connect(self.meme_db_path, timeout=5) as db_conn: db_conn.row_factory = sqlite3.Row rows_per_page: int = 10 offset: int = (page - 1) * rows_per_page query: str = "SELECT id, timestamp FROM memes ORDER BY timestamp DESC LIMIT 10 OFFSET ?" async with await db_conn.execute(query, (offset,)) as db_cursor: results = await db_cursor.fetchall() for result in results: result_id = result["id"] result_timestamp = result["timestamp"] out_result.append( { "id": result_id, "timestamp": result_timestamp, } ) return out_result