import os import logging import io import traceback import math from typing import Optional import aiosqlite as sqlite3 from PIL import Image class MemeUtil: """ Meme Utils """ def __init__(self, constants) -> None: self.constants = constants self.meme_db_path = os.path.join("/usr/local/share", "sqlite_dbs", "meme.db") def is_png(self, buffer: bytes | io.BytesIO) -> bool: """ Check if image (in-memory buffer, or bytes) is a PNG Args: buffer (bytes|io.BytesIO) Returns: bool """ # Accepts either bytes or a BytesIO-like object if isinstance(buffer, io.BytesIO): if hasattr(buffer, "read") and hasattr(buffer, "seek"): pos = buffer.tell() buffer.seek(0) signature = buffer.read(8) buffer.seek(pos) else: signature = buffer[:8] return signature == b"\x89PNG\r\n\x1a\n" def convert_to_png(self, in_buffer: io.BytesIO) -> bytes: """ Convert an in-memory buffer to PNG Args: in_buffer (io.BytesIO) Returns: bytes """ in_buffer.seek(0) with Image.open(in_buffer) as im: if im.format == "PNG": raise ValueError("Already a PNG") out_buffer = io.BytesIO() im.save(out_buffer, format="PNG") out_buffer.seek(0) return out_buffer.read() async def get_meme_by_id(self, meme_id: int) -> Optional[bytes]: """ Get meme by id Args: meme_id (int) Returns: Optional[bytes] """ ret_image: Optional[bytes] = None buffer: Optional[io.BytesIO] = None async with sqlite3.connect(self.meme_db_path, timeout=5) as db_conn: db_conn.row_factory = sqlite3.Row query: str = "SELECT image FROM memes WHERE id = ? LIMIT 1" async with await db_conn.execute(query, (meme_id,)) as db_cursor: result = await db_cursor.fetchone() if not result: return None buffer = io.BytesIO(result["image"]) is_png = self.is_png(buffer) if not is_png: logging.debug( "Converting meme_id: %s, not detected as PNG", meme_id ) ret_image = self.convert_to_png(buffer) converted = await self.replace_with_converted_png( meme_id, ret_image ) if converted: logging.info("Converted meme_id: %s", meme_id) else: logging.info("Failed to convert meme_id: %s", meme_id) else: ret_image = result["image"] return ret_image async def get_random_meme(self) -> Optional[bytes]: """ Get random meme Returns: Optional[bytes] """ ret_image: Optional[bytes] = None buffer: Optional[io.BytesIO] = None async with sqlite3.connect(self.meme_db_path, timeout=5) as db_conn: db_conn.row_factory = sqlite3.Row query: str = "SELECT id, image FROM memes ORDER BY RANDOM() LIMIT 1" async with await db_conn.execute(query) as db_cursor: result = await db_cursor.fetchone() if not result: return None meme_id = result["id"] buffer = io.BytesIO(result["image"]) is_png = self.is_png(buffer) if not is_png: logging.debug("Converting %s, not detected as PNG", meme_id) ret_image = self.convert_to_png(buffer) else: ret_image = result["image"] return ret_image async def list_memes(self, page: int) -> Optional[list]: """ List memes (paginated) Args: page (id) Returns: Optional[list] """ out_result: list = [] async with sqlite3.connect(self.meme_db_path, timeout=5) as db_conn: db_conn.row_factory = sqlite3.Row rows_per_page: int = 10 offset: int = (page - 1) * rows_per_page query: str = "SELECT id, timestamp FROM memes ORDER BY timestamp DESC LIMIT 10 OFFSET ?" async with await db_conn.execute(query, (offset,)) as db_cursor: results = await db_cursor.fetchall() for result in results: result_id = result["id"] result_timestamp = result["timestamp"] out_result.append( { "id": result_id, "timestamp": result_timestamp, } ) return out_result async def get_page_count(self) -> Optional[int]: """ Get page count Returns: Optional[int] """ async with sqlite3.connect(self.meme_db_path, timeout=5) as db_conn: db_conn.row_factory = sqlite3.Row rows_per_page: int = 10 pages: Optional[int] = None query: str = "SELECT count(id) AS count FROM memes" async with await db_conn.execute(query) as db_cursor: result = await db_cursor.fetchone() count = result["count"] if not isinstance(count, int): return None pages = math.ceil(count / rows_per_page) return pages async def replace_with_converted_png(self, meme_id: int, meme_image: bytes) -> bool: """ Replace stored image with converted PNG Args: meme_id (int) meme_image (bytes) Returns: bool """ update_query: str = "UPDATE memes SET image = ?, file_ext = 'PNG' WHERE id = ?" params: tuple = ( meme_image, meme_id, ) try: async with sqlite3.connect(self.meme_db_path, timeout=5) as db_conn: update = await db_conn.execute_insert(update_query, params) if not update: logging.info( "replace_with_converted_png: Failed -> Update: %s\nFor meme_id: %s", update, meme_id, ) return False else: return True except Exception as e: logging.info("replace_with_converted_png: %s", str(e)) traceback.print_exc() return False