195 lines
		
	
	
		
			6.8 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			195 lines
		
	
	
		
			6.8 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| import os
 | |
| import logging
 | |
| import io
 | |
| import traceback
 | |
| import math
 | |
| from typing import Optional
 | |
| import aiosqlite as sqlite3
 | |
| from PIL import Image
 | |
| 
 | |
| 
 | |
| class MemeUtil:
 | |
|     """
 | |
|     Meme Utils
 | |
|     """
 | |
| 
 | |
|     def __init__(self, constants) -> None:
 | |
|         self.constants = constants
 | |
|         self.meme_db_path = os.path.join("/usr/local/share", "sqlite_dbs", "meme.db")
 | |
| 
 | |
|     def is_png(self, buffer: bytes | io.BytesIO) -> bool:
 | |
|         """
 | |
|         Check if image (in-memory buffer, or bytes) is a PNG
 | |
|         Args:
 | |
|             buffer (bytes|io.BytesIO)
 | |
|         Returns:
 | |
|             bool
 | |
|         """
 | |
|         # Accepts either bytes or a BytesIO-like object
 | |
|         signature = None
 | |
|         if isinstance(buffer, io.BytesIO):
 | |
|             if hasattr(buffer, "read") and hasattr(buffer, "seek"):
 | |
|                 pos = buffer.tell()
 | |
|                 buffer.seek(0)
 | |
|                 signature = buffer.read(8)
 | |
|                 buffer.seek(pos)
 | |
|         else:
 | |
|             signature = buffer[:8]
 | |
|         return signature == b"\x89PNG\r\n\x1a\n"
 | |
| 
 | |
|     def convert_to_png(self, in_buffer: io.BytesIO) -> bytes:
 | |
|         """
 | |
|         Convert an in-memory buffer to PNG
 | |
|         Args:
 | |
|             in_buffer (io.BytesIO)
 | |
|         Returns:
 | |
|             bytes
 | |
|         """
 | |
|         in_buffer.seek(0)
 | |
|         with Image.open(in_buffer) as im:
 | |
|             if im.format == "PNG":
 | |
|                 raise ValueError("Already a PNG")
 | |
|             out_buffer = io.BytesIO()
 | |
|             im.save(out_buffer, format="PNG")
 | |
|             out_buffer.seek(0)
 | |
|         return out_buffer.read()
 | |
| 
 | |
|     async def get_meme_by_id(self, meme_id: int) -> Optional[bytes]:
 | |
|         """
 | |
|         Get meme by id
 | |
|         Args:
 | |
|             meme_id (int)
 | |
|         Returns:
 | |
|             Optional[bytes]
 | |
|         """
 | |
|         ret_image: Optional[bytes] = None
 | |
|         buffer: Optional[io.BytesIO] = None
 | |
|         async with sqlite3.connect(self.meme_db_path, timeout=5) as db_conn:
 | |
|             db_conn.row_factory = sqlite3.Row
 | |
|             query: str = "SELECT image FROM memes WHERE id = ? LIMIT 1"
 | |
|             async with await db_conn.execute(query, (meme_id,)) as db_cursor:
 | |
|                 result = await db_cursor.fetchone()
 | |
|                 if not result:
 | |
|                     return None
 | |
|                 buffer = io.BytesIO(result["image"])
 | |
|                 is_png = self.is_png(buffer)
 | |
|                 if not is_png:
 | |
|                     logging.debug(
 | |
|                         "Converting meme_id: %s, not detected as PNG", meme_id
 | |
|                     )
 | |
|                     ret_image = self.convert_to_png(buffer)
 | |
|                     converted = await self.replace_with_converted_png(
 | |
|                         meme_id, ret_image
 | |
|                     )
 | |
|                     if converted:
 | |
|                         logging.info("Converted meme_id: %s", meme_id)
 | |
|                     else:
 | |
|                         logging.info("Failed to convert meme_id: %s", meme_id)
 | |
|                 else:
 | |
|                     ret_image = result["image"]
 | |
|                 return ret_image
 | |
| 
 | |
|     async def get_random_meme(self) -> Optional[bytes]:
 | |
|         """
 | |
|         Get random meme
 | |
|         Returns:
 | |
|             Optional[bytes]
 | |
|         """
 | |
|         ret_image: Optional[bytes] = None
 | |
|         buffer: Optional[io.BytesIO] = None
 | |
|         async with sqlite3.connect(self.meme_db_path, timeout=5) as db_conn:
 | |
|             db_conn.row_factory = sqlite3.Row
 | |
|             query: str = "SELECT id, image FROM memes ORDER BY RANDOM() LIMIT 1"
 | |
|             async with await db_conn.execute(query) as db_cursor:
 | |
|                 result = await db_cursor.fetchone()
 | |
|                 if not result:
 | |
|                     return None
 | |
|                 meme_id = result["id"]
 | |
|                 buffer = io.BytesIO(result["image"])
 | |
|                 is_png = self.is_png(buffer)
 | |
|                 if not is_png:
 | |
|                     logging.debug("Converting %s, not detected as PNG", meme_id)
 | |
|                     ret_image = self.convert_to_png(buffer)
 | |
|                 else:
 | |
|                     ret_image = result["image"]
 | |
|                 return ret_image
 | |
| 
 | |
|     async def list_memes(self, page: int) -> Optional[list]:
 | |
|         """
 | |
|         List memes (paginated)
 | |
|         Args:
 | |
|             page (id)
 | |
|         Returns:
 | |
|             Optional[list]
 | |
|         """
 | |
|         out_result: list = []
 | |
|         async with sqlite3.connect(self.meme_db_path, timeout=5) as db_conn:
 | |
|             db_conn.row_factory = sqlite3.Row
 | |
|             rows_per_page: int = 10
 | |
|             offset: int = (page - 1) * rows_per_page
 | |
|             query: str = "SELECT id, timestamp FROM memes ORDER BY timestamp DESC LIMIT 10 OFFSET ?"
 | |
|             async with await db_conn.execute(query, (offset,)) as db_cursor:
 | |
|                 results = await db_cursor.fetchall()
 | |
|                 for result in results:
 | |
|                     result_id = result["id"]
 | |
|                     result_timestamp = result["timestamp"]
 | |
|                     out_result.append(
 | |
|                         {
 | |
|                             "id": result_id,
 | |
|                             "timestamp": result_timestamp,
 | |
|                         }
 | |
|                     )
 | |
|         return out_result
 | |
| 
 | |
|     async def get_page_count(self) -> Optional[int]:
 | |
|         """
 | |
|         Get page count
 | |
|         Returns:
 | |
|             Optional[int]
 | |
|         """
 | |
|         async with sqlite3.connect(self.meme_db_path, timeout=5) as db_conn:
 | |
|             db_conn.row_factory = sqlite3.Row
 | |
|             rows_per_page: int = 10
 | |
|             pages: Optional[int] = None
 | |
|             query: str = "SELECT count(id) AS count FROM memes"
 | |
|             async with await db_conn.execute(query) as db_cursor:
 | |
|                 result = await db_cursor.fetchone()
 | |
|                 if not result:
 | |
|                     return None
 | |
|                 count = result["count"]
 | |
|                 if not isinstance(count, int):
 | |
|                     return None
 | |
|                 pages = math.ceil(count / rows_per_page)
 | |
|                 return pages
 | |
| 
 | |
|     async def replace_with_converted_png(self, meme_id: int, meme_image: bytes) -> bool:
 | |
|         """
 | |
|         Replace stored image with converted PNG
 | |
|         Args:
 | |
|             meme_id (int)
 | |
|             meme_image (bytes)
 | |
|         Returns:
 | |
|             bool
 | |
|         """
 | |
|         update_query: str = "UPDATE memes SET image = ?, file_ext = 'PNG' WHERE id = ?"
 | |
|         params: tuple = (
 | |
|             meme_image,
 | |
|             meme_id,
 | |
|         )
 | |
|         try:
 | |
|             async with sqlite3.connect(self.meme_db_path, timeout=5) as db_conn:
 | |
|                 update = await db_conn.execute_insert(update_query, params)
 | |
|                 if not update:
 | |
|                     logging.info(
 | |
|                         "replace_with_converted_png: Failed -> Update: %s\nFor meme_id: %s",
 | |
|                         update,
 | |
|                         meme_id,
 | |
|                     )
 | |
|                     return False
 | |
|                 else:
 | |
|                     return True
 | |
|         except Exception as e:
 | |
|             logging.info("replace_with_converted_png: %s", str(e))
 | |
|             traceback.print_exc()
 | |
|             return False
 |