151 lines
5.2 KiB
Python
151 lines
5.2 KiB
Python
import os
|
|
import logging
|
|
import io
|
|
import math
|
|
from typing import Optional
|
|
import aiosqlite as sqlite3
|
|
from PIL import Image
|
|
|
|
|
|
class MemeUtil:
|
|
"""
|
|
Meme Utils
|
|
"""
|
|
|
|
def __init__(self, constants) -> None:
|
|
self.constants = constants
|
|
self.meme_db_path = os.path.join("/mnt/data/share", "sqlite_dbs", "meme.db")
|
|
|
|
def is_png(self, buffer: bytes | io.BytesIO) -> bool:
|
|
"""
|
|
Check if image (in-memory buffer, or bytes) is a PNG
|
|
Args:
|
|
buffer (bytes|io.BytesIO)
|
|
Returns:
|
|
bool
|
|
"""
|
|
# Accepts either bytes or a BytesIO-like object
|
|
if isinstance(buffer, io.BytesIO):
|
|
if hasattr(buffer, "read") and hasattr(buffer, "seek"):
|
|
pos = buffer.tell()
|
|
buffer.seek(0)
|
|
signature = buffer.read(8)
|
|
buffer.seek(pos)
|
|
else:
|
|
signature = buffer[:8]
|
|
return signature == b"\x89PNG\r\n\x1a\n"
|
|
|
|
def convert_to_png(self, in_buffer: io.BytesIO) -> bytes:
|
|
"""
|
|
Convert an in-memory buffer to PNG
|
|
Args:
|
|
in_buffer (io.BytesIO)
|
|
Returns:
|
|
bytes
|
|
"""
|
|
in_buffer.seek(0)
|
|
with Image.open(in_buffer) as im:
|
|
if im.format == "PNG":
|
|
raise ValueError("Already a PNG")
|
|
out_buffer = io.BytesIO()
|
|
im.save(out_buffer, format="PNG")
|
|
out_buffer.seek(0)
|
|
return out_buffer.read()
|
|
|
|
async def get_meme_by_id(self, meme_id: int) -> Optional[bytes]:
|
|
"""
|
|
Get meme by id
|
|
Args:
|
|
meme_id (int)
|
|
Returns:
|
|
Optional[bytes]
|
|
"""
|
|
ret_image: Optional[bytes] = None
|
|
buffer: Optional[io.BytesIO] = None
|
|
async with sqlite3.connect(self.meme_db_path, timeout=5) as db_conn:
|
|
db_conn.row_factory = sqlite3.Row
|
|
query: str = "SELECT image FROM memes WHERE id = ? LIMIT 1"
|
|
async with await db_conn.execute(query, (meme_id,)) as db_cursor:
|
|
result = await db_cursor.fetchone()
|
|
if not result:
|
|
return None
|
|
buffer = io.BytesIO(result["image"])
|
|
is_png = self.is_png(buffer)
|
|
if not is_png:
|
|
logging.debug("Converting %s, not detected as PNG", meme_id)
|
|
ret_image = self.convert_to_png(buffer)
|
|
else:
|
|
ret_image = result["image"]
|
|
return ret_image
|
|
|
|
async def get_random_meme(self) -> Optional[bytes]:
|
|
"""
|
|
Get random meme
|
|
Returns:
|
|
Optional[bytes]
|
|
"""
|
|
ret_image: Optional[bytes] = None
|
|
buffer: Optional[io.BytesIO] = None
|
|
async with sqlite3.connect(self.meme_db_path, timeout=5) as db_conn:
|
|
db_conn.row_factory = sqlite3.Row
|
|
query: str = "SELECT id, image FROM memes ORDER BY RANDOM() LIMIT 1"
|
|
async with await db_conn.execute(query) as db_cursor:
|
|
result = await db_cursor.fetchone()
|
|
if not result:
|
|
return None
|
|
meme_id = result["id"]
|
|
buffer = io.BytesIO(result["image"])
|
|
is_png = self.is_png(buffer)
|
|
if not is_png:
|
|
logging.debug("Converting %s, not detected as PNG", meme_id)
|
|
ret_image = self.convert_to_png(buffer)
|
|
else:
|
|
ret_image = result["image"]
|
|
return ret_image
|
|
|
|
async def list_memes(self, page: int) -> Optional[list]:
|
|
"""
|
|
List memes (paginated)
|
|
Args:
|
|
page (id)
|
|
Returns:
|
|
Optional[list]
|
|
"""
|
|
out_result: list = []
|
|
async with sqlite3.connect(self.meme_db_path, timeout=5) as db_conn:
|
|
db_conn.row_factory = sqlite3.Row
|
|
rows_per_page: int = 10
|
|
offset: int = (page - 1) * rows_per_page
|
|
query: str = "SELECT id, timestamp FROM memes ORDER BY timestamp DESC LIMIT 10 OFFSET ?"
|
|
async with await db_conn.execute(query, (offset,)) as db_cursor:
|
|
results = await db_cursor.fetchall()
|
|
for result in results:
|
|
result_id = result["id"]
|
|
result_timestamp = result["timestamp"]
|
|
out_result.append(
|
|
{
|
|
"id": result_id,
|
|
"timestamp": result_timestamp,
|
|
}
|
|
)
|
|
return out_result
|
|
|
|
async def get_page_count(self) -> Optional[int]:
|
|
"""
|
|
Get page count
|
|
Returns:
|
|
Optional[int]
|
|
"""
|
|
async with sqlite3.connect(self.meme_db_path, timeout=5) as db_conn:
|
|
db_conn.row_factory = sqlite3.Row
|
|
rows_per_page: int = 10
|
|
pages: Optional[int] = None
|
|
query: str = "SELECT count(id) AS count FROM memes"
|
|
async with await db_conn.execute(query) as db_cursor:
|
|
result = await db_cursor.fetchone()
|
|
count = result["count"]
|
|
if not isinstance(count, int):
|
|
return None
|
|
pages = math.ceil(count / rows_per_page)
|
|
return pages
|