meme lb
This commit is contained in:
		
							
								
								
									
										182
									
								
								cogs/meme.py
									
									
									
									
									
								
							
							
						
						
									
										182
									
								
								cogs/meme.py
									
									
									
									
									
								
							| @@ -4,7 +4,11 @@ import json | ||||
| import io | ||||
| import asyncio | ||||
| import random | ||||
| from typing import LiteralString, Optional, Any | ||||
| from typing import (LiteralString, | ||||
|                     Optional,  | ||||
|                     Any, | ||||
|                     Union) | ||||
| import aiosqlite as sqlite3 | ||||
| import logging | ||||
| import textwrap | ||||
| import regex | ||||
| @@ -27,6 +31,10 @@ import constants | ||||
| meme_choices = [] | ||||
| BOT_CHANIDS = [] | ||||
|  | ||||
| """ | ||||
| TODO: Cleanup new meme leaderboard stuff | ||||
| """ | ||||
|  | ||||
| class Helper: | ||||
|     """Meme Helper""" | ||||
|     def load_meme_choices(self) -> None: | ||||
| @@ -103,6 +111,8 @@ class Meme(commands.Cog): | ||||
|  | ||||
|     def __init__(self, bot: Havoc) -> None: | ||||
|         self.bot: Havoc = bot | ||||
|         self.stats_db_path: LiteralString = os.path.join("/usr/local/share", | ||||
|                                           "sqlite_dbs", "stats.db") | ||||
|         self.meme_choices: list = [] | ||||
|         self.meme_counter: int = 0 | ||||
|         self.THREADS: dict[str, dict[int, list]] = { | ||||
| @@ -140,6 +150,8 @@ class Meme(commands.Cog): | ||||
|  | ||||
|         self.meme_stream_loop.start() | ||||
|         self.explosm_loop.start() | ||||
|         self.update_meme_lb.start() | ||||
|         asyncio.get_event_loop().create_task(self.init_meme_leaderboard()) | ||||
|          | ||||
|     def is_spamchan() -> bool: # type: ignore | ||||
|         """Check if channel is spamchan""" | ||||
| @@ -153,7 +165,50 @@ class Meme(commands.Cog): | ||||
|                 return False | ||||
|         return commands.check(predicate) # type: ignore | ||||
|      | ||||
|     async def leaderboard_increment(self,  | ||||
|                                     uid: int) -> None: | ||||
|         """ | ||||
|         Increment leaderboard for uid | ||||
|         Args: | ||||
|             uid (int):  | ||||
|         Returns: | ||||
|                 None | ||||
|         """ | ||||
|          | ||||
|         logging.critical("INCR FOR %s", uid) | ||||
|          | ||||
|         if not uid in self.meme_leaderboard: | ||||
|             self.meme_leaderboard[uid] = 1 | ||||
|         else: | ||||
|             self.meme_leaderboard[uid] += 1 | ||||
|          | ||||
|         async with sqlite3.connect(self.stats_db_path, timeout=2) as db_conn: | ||||
|             query: str = "INSERT OR REPLACE INTO memes (discord_uid, count) VALUES (?, ?)" | ||||
|             params: tuple = (uid, self.meme_leaderboard[uid]) | ||||
|             async with db_conn.execute(query, params) as db_cursor: | ||||
|                 await db_conn.commit() | ||||
|                  | ||||
|         logging.critical("DONE") | ||||
|          | ||||
|     async def init_meme_leaderboard(self) -> None: | ||||
|         """ | ||||
|         INIT MEME LEADERBOARD | ||||
|         """ | ||||
|         self.meme_leaderboard: dict [int, int] = {} | ||||
|         async with sqlite3.connect(self.stats_db_path, timeout=2) as db_conn: | ||||
|             db_conn.row_factory = sqlite3.Row | ||||
|             db_query: str = "SELECT discord_uid, count FROM memes WHERE count > 0" | ||||
|             async with db_conn.execute(db_query) as db_cursor: | ||||
|                 results = await db_cursor.fetchall() | ||||
|                 for result in results: | ||||
|                     uid = result['discord_uid'] | ||||
|                     count = result['count'] | ||||
|                     self.meme_leaderboard[uid] = count      | ||||
|      | ||||
|     @commands.Cog.listener() | ||||
|     async def on_ready(self) -> None: | ||||
|         """Run on Bot Ready""" | ||||
|         await self.init_meme_leaderboard() | ||||
|          | ||||
|     async def do_autos(self, only_comics: Optional[bool] = False) -> None: | ||||
|         """ | ||||
| @@ -485,9 +540,134 @@ class Meme(commands.Cog): | ||||
|             await ctx.respond("Fuck! :(", ephemeral=True) | ||||
|             traceback.print_exc() | ||||
|              | ||||
|     @commands.Cog.listener() | ||||
|     async def on_message(self, message: discord.Message) -> None: | ||||
|         """ | ||||
|         Message hook, to monitor for memes | ||||
|         Also monitors for messages to #memes-top-10 to autodelete, only Havoc may post in #memes-top-10! | ||||
|         """ | ||||
|         lb_chanid: int = 1352373745108652145 | ||||
|         if not self.bot.user: # No valid client instance | ||||
|             return | ||||
|         if not isinstance(message.channel, discord.TextChannel): | ||||
|             return | ||||
|         if message.channel.id == lb_chanid\ | ||||
|             and not message.author.id == self.bot.user.id: | ||||
|             """Message to #memes-top-10 not by Havoc, delete it""" | ||||
|             await message.delete(reason=f"Messages to #{message.channel.name} are not allowed") | ||||
|             removal_embed: discord.Embed = discord.Embed( | ||||
|                 title="Message Deleted", | ||||
|                 description=f"Your message to **#{message.channel.name}** has been automatically deleted.\n**Reason**: Messages to this channel by users is not allowed." | ||||
|             ) | ||||
|             await message.author.send(embed=removal_embed) | ||||
|          | ||||
|         if message.author.id == self.bot.user.id: # Bots own message | ||||
|             return | ||||
|         if not message.guild: | ||||
|             return | ||||
|         if not message.channel.id == 1147229098544988261: # Not meme channel | ||||
|             return | ||||
|         if not message.attachments: # No attachments to consider a meme | ||||
|             return  | ||||
|          | ||||
|         await self.leaderboard_increment(message.author.id) | ||||
|  | ||||
|     async def get_top(self, n: int = 10) -> Optional[list[tuple]]: | ||||
|         """ | ||||
|         Get top (n=10) Memes | ||||
|          | ||||
|         Args: | ||||
|             n (int): Number of top results to return, default 10 | ||||
|         Returns: | ||||
|             Optional[dict]  | ||||
|         """ | ||||
|         try: | ||||
|             out_top: list[tuple[int, int]] = [] | ||||
|             async with sqlite3.connect(self.stats_db_path, timeout=2) as db_conn: | ||||
|                 db_conn.row_factory = sqlite3.Row | ||||
|                 query: str = "SELECT discord_uid, count FROM memes WHERE count > 0 ORDER BY count DESC" | ||||
|                 async with db_conn.execute(query) as db_cursor: | ||||
|                     db_result = await db_cursor.fetchall() | ||||
|                     for res in db_result: | ||||
|                         uid = res['discord_uid'] | ||||
|                         count = res['count'] | ||||
|                         out_top.append((uid, count)) | ||||
|             # Check for and remove missing members | ||||
|             guild_id: int = 1145182936002482196 | ||||
|             guild: Optional[discord.Guild] = self.bot.get_guild(guild_id) | ||||
|             if not guild: | ||||
|                 return None | ||||
|             for x, entry in enumerate(out_top): | ||||
|                 (uid, _) = entry | ||||
|                 member: Optional[discord.Member] = guild.get_member(uid) | ||||
|                 if not member: | ||||
|                     out_top.pop(x) | ||||
|             return out_top[0:(n+1)]  | ||||
|         except: | ||||
|             traceback.print_exc() | ||||
|             return None | ||||
|          | ||||
|     async def get_top_embed(self, n:int = 10) -> Optional[discord.Embed]: | ||||
|         """ | ||||
|         Get Top Memes Embed | ||||
|          | ||||
|         Args: | ||||
|             n (int): Number of top results to return, default 10 | ||||
|         Returns: | ||||
|             Optional[discord.Embed]  | ||||
|         """ | ||||
|         guild_id: int = 1145182936002482196 | ||||
|         guild: Optional[discord.Guild] = self.bot.get_guild(guild_id) | ||||
|         if not guild: | ||||
|             return None | ||||
|         top: Optional[list[tuple]] = await self.get_top(n) | ||||
|         if not top: | ||||
|             return None | ||||
|         top_formatted: str = "" | ||||
|         for x, item in enumerate(top): | ||||
|             (uid, count) = item | ||||
|             member: Optional[discord.Member] = guild.get_member(uid) | ||||
|             if not member: | ||||
|                 continue | ||||
|             display_name: str = member.display_name | ||||
|             top_formatted += f"{x+1}. **{discord.utils.escape_markdown(display_name)}**: *{count}*\n" | ||||
|         top_formatted = top_formatted.strip() | ||||
|         embed: discord.Embed = discord.Embed(title=f"Top {n} Memes", | ||||
|                             description=top_formatted, | ||||
|                             colour=0xff00ff) | ||||
|         return embed     | ||||
|      | ||||
|     @tasks.loop(seconds=30, reconnect=True) | ||||
|     async def update_meme_lb(self) -> None: | ||||
|         """Update the Meme Leaderboard""" | ||||
|         try: | ||||
|             lb_chanid: int = 1352373745108652145 | ||||
|             message_id: int = 1352440888231723070 | ||||
|             top_embed = await self.get_top_embed(n=10) | ||||
|             channel = self.bot.get_channel(lb_chanid) | ||||
|             if not isinstance(channel, discord.TextChannel): | ||||
|                 return | ||||
|             message_to_edit = await channel.fetch_message(message_id) | ||||
|             await message_to_edit.edit(embed=top_embed, | ||||
|                                     content="## This message will automatically update periodically.") | ||||
|         except: | ||||
|             traceback.print_exc()     | ||||
|      | ||||
|     @bridge.bridge_command(hidden=True) | ||||
|     @commands.is_owner() | ||||
|     async def doembed(self, ctx) -> None: | ||||
|         """Do Meme Embed""" | ||||
|         meme_lb_chan_id: int = 1352373745108652145 | ||||
|         meme_lb_chan: Union[discord.TextChannel, Any] = self.bot.get_channel(meme_lb_chan_id) | ||||
|         embed = await self.get_top_embed() | ||||
|         if embed: | ||||
|             await meme_lb_chan.send(embed=embed) | ||||
|         else: | ||||
|             await ctx.respond("NO embed :(") | ||||
|     def cog_unload(self) -> None: | ||||
|         self.meme_stream_loop.cancel() | ||||
|         self.explosm_loop.cancel() | ||||
|         self.update_meme_lb.cancel() | ||||
|          | ||||
| def setup(bot) -> None: | ||||
|     """Run on Cog Load""" | ||||
|   | ||||
| @@ -1215,6 +1215,7 @@ class Misc(commands.Cog): | ||||
|                 logging.debug("Failed to add puke reactin for touch command: %s", | ||||
|                              str(e)) | ||||
|             await self.util.increment_counter("touch_denials") | ||||
|             return | ||||
|         else: | ||||
|             recipient_normal = recipient | ||||
|             if discord.utils.raw_mentions(recipient): | ||||
|   | ||||
		Reference in New Issue
	
	Block a user