From 61895c8e854f7cee9c0b3764a12896b49d1c437f Mon Sep 17 00:00:00 2001 From: codey Date: Thu, 20 Mar 2025 20:49:23 -0400 Subject: [PATCH] meme lb --- cogs/meme.py | 186 ++++++++++++++++++++++++++++++++++++++++++++++++++- cogs/misc.py | 1 + 2 files changed, 184 insertions(+), 3 deletions(-) diff --git a/cogs/meme.py b/cogs/meme.py index b81878f..5e7cf70 100644 --- a/cogs/meme.py +++ b/cogs/meme.py @@ -4,7 +4,11 @@ import json import io import asyncio import random -from typing import LiteralString, Optional, Any +from typing import (LiteralString, + Optional, + Any, + Union) +import aiosqlite as sqlite3 import logging import textwrap import regex @@ -27,6 +31,10 @@ import constants meme_choices = [] BOT_CHANIDS = [] +""" +TODO: Cleanup new meme leaderboard stuff +""" + class Helper: """Meme Helper""" def load_meme_choices(self) -> None: @@ -103,6 +111,8 @@ class Meme(commands.Cog): def __init__(self, bot: Havoc) -> None: self.bot: Havoc = bot + self.stats_db_path: LiteralString = os.path.join("/usr/local/share", + "sqlite_dbs", "stats.db") self.meme_choices: list = [] self.meme_counter: int = 0 self.THREADS: dict[str, dict[int, list]] = { @@ -140,7 +150,9 @@ class Meme(commands.Cog): self.meme_stream_loop.start() self.explosm_loop.start() - + self.update_meme_lb.start() + asyncio.get_event_loop().create_task(self.init_meme_leaderboard()) + def is_spamchan() -> bool: # type: ignore """Check if channel is spamchan""" def predicate(ctx): @@ -153,8 +165,51 @@ class Meme(commands.Cog): return False return commands.check(predicate) # type: ignore + async def leaderboard_increment(self, + uid: int) -> None: + """ + Increment leaderboard for uid + Args: + uid (int): + Returns: + None + """ + + logging.critical("INCR FOR %s", uid) + + if not uid in self.meme_leaderboard: + self.meme_leaderboard[uid] = 1 + else: + self.meme_leaderboard[uid] += 1 + + async with sqlite3.connect(self.stats_db_path, timeout=2) as db_conn: + query: str = "INSERT OR REPLACE INTO memes (discord_uid, count) VALUES (?, ?)" + params: tuple = (uid, self.meme_leaderboard[uid]) + async with db_conn.execute(query, params) as db_cursor: + await db_conn.commit() + + logging.critical("DONE") + + async def init_meme_leaderboard(self) -> None: + """ + INIT MEME LEADERBOARD + """ + self.meme_leaderboard: dict [int, int] = {} + async with sqlite3.connect(self.stats_db_path, timeout=2) as db_conn: + db_conn.row_factory = sqlite3.Row + db_query: str = "SELECT discord_uid, count FROM memes WHERE count > 0" + async with db_conn.execute(db_query) as db_cursor: + results = await db_cursor.fetchall() + for result in results: + uid = result['discord_uid'] + count = result['count'] + self.meme_leaderboard[uid] = count - + @commands.Cog.listener() + async def on_ready(self) -> None: + """Run on Bot Ready""" + await self.init_meme_leaderboard() + async def do_autos(self, only_comics: Optional[bool] = False) -> None: """ Run Auto Posters @@ -484,10 +539,135 @@ class Meme(commands.Cog): except: await ctx.respond("Fuck! :(", ephemeral=True) traceback.print_exc() + + @commands.Cog.listener() + async def on_message(self, message: discord.Message) -> None: + """ + Message hook, to monitor for memes + Also monitors for messages to #memes-top-10 to autodelete, only Havoc may post in #memes-top-10! + """ + lb_chanid: int = 1352373745108652145 + if not self.bot.user: # No valid client instance + return + if not isinstance(message.channel, discord.TextChannel): + return + if message.channel.id == lb_chanid\ + and not message.author.id == self.bot.user.id: + """Message to #memes-top-10 not by Havoc, delete it""" + await message.delete(reason=f"Messages to #{message.channel.name} are not allowed") + removal_embed: discord.Embed = discord.Embed( + title="Message Deleted", + description=f"Your message to **#{message.channel.name}** has been automatically deleted.\n**Reason**: Messages to this channel by users is not allowed." + ) + await message.author.send(embed=removal_embed) + + if message.author.id == self.bot.user.id: # Bots own message + return + if not message.guild: + return + if not message.channel.id == 1147229098544988261: # Not meme channel + return + if not message.attachments: # No attachments to consider a meme + return + + await self.leaderboard_increment(message.author.id) + + async def get_top(self, n: int = 10) -> Optional[list[tuple]]: + """ + Get top (n=10) Memes + + Args: + n (int): Number of top results to return, default 10 + Returns: + Optional[dict] + """ + try: + out_top: list[tuple[int, int]] = [] + async with sqlite3.connect(self.stats_db_path, timeout=2) as db_conn: + db_conn.row_factory = sqlite3.Row + query: str = "SELECT discord_uid, count FROM memes WHERE count > 0 ORDER BY count DESC" + async with db_conn.execute(query) as db_cursor: + db_result = await db_cursor.fetchall() + for res in db_result: + uid = res['discord_uid'] + count = res['count'] + out_top.append((uid, count)) + # Check for and remove missing members + guild_id: int = 1145182936002482196 + guild: Optional[discord.Guild] = self.bot.get_guild(guild_id) + if not guild: + return None + for x, entry in enumerate(out_top): + (uid, _) = entry + member: Optional[discord.Member] = guild.get_member(uid) + if not member: + out_top.pop(x) + return out_top[0:(n+1)] + except: + traceback.print_exc() + return None + + async def get_top_embed(self, n:int = 10) -> Optional[discord.Embed]: + """ + Get Top Memes Embed + + Args: + n (int): Number of top results to return, default 10 + Returns: + Optional[discord.Embed] + """ + guild_id: int = 1145182936002482196 + guild: Optional[discord.Guild] = self.bot.get_guild(guild_id) + if not guild: + return None + top: Optional[list[tuple]] = await self.get_top(n) + if not top: + return None + top_formatted: str = "" + for x, item in enumerate(top): + (uid, count) = item + member: Optional[discord.Member] = guild.get_member(uid) + if not member: + continue + display_name: str = member.display_name + top_formatted += f"{x+1}. **{discord.utils.escape_markdown(display_name)}**: *{count}*\n" + top_formatted = top_formatted.strip() + embed: discord.Embed = discord.Embed(title=f"Top {n} Memes", + description=top_formatted, + colour=0xff00ff) + return embed + @tasks.loop(seconds=30, reconnect=True) + async def update_meme_lb(self) -> None: + """Update the Meme Leaderboard""" + try: + lb_chanid: int = 1352373745108652145 + message_id: int = 1352440888231723070 + top_embed = await self.get_top_embed(n=10) + channel = self.bot.get_channel(lb_chanid) + if not isinstance(channel, discord.TextChannel): + return + message_to_edit = await channel.fetch_message(message_id) + await message_to_edit.edit(embed=top_embed, + content="## This message will automatically update periodically.") + except: + traceback.print_exc() + + @bridge.bridge_command(hidden=True) + @commands.is_owner() + async def doembed(self, ctx) -> None: + """Do Meme Embed""" + meme_lb_chan_id: int = 1352373745108652145 + meme_lb_chan: Union[discord.TextChannel, Any] = self.bot.get_channel(meme_lb_chan_id) + embed = await self.get_top_embed() + if embed: + await meme_lb_chan.send(embed=embed) + else: + await ctx.respond("NO embed :(") def cog_unload(self) -> None: self.meme_stream_loop.cancel() self.explosm_loop.cancel() + self.update_meme_lb.cancel() def setup(bot) -> None: """Run on Cog Load""" diff --git a/cogs/misc.py b/cogs/misc.py index 299a1d6..ae6bbe5 100644 --- a/cogs/misc.py +++ b/cogs/misc.py @@ -1215,6 +1215,7 @@ class Misc(commands.Cog): logging.debug("Failed to add puke reactin for touch command: %s", str(e)) await self.util.increment_counter("touch_denials") + return else: recipient_normal = recipient if discord.utils.raw_mentions(recipient):