#!/usr/bin/env python3.12 # pylint: disable=broad-exception-caught, bare-except, invalid-name import sys from os import path sys.path.append( path.dirname( path.dirname( path.abspath(__file__) ) ) ) import constants import traceback import time import importlib import logging from typing import Optional import discord import regex from regex import Pattern from aiohttp import ClientSession, ClientTimeout from discord.ext import bridge, commands, tasks from disc_havoc import Havoc class Util: """Karma Utility""" def __init__(self, bot: Havoc): self.bot: Havoc = bot self.api_key: str = constants.PRV_API_KEY self.karma_endpoints_base_url: str = "https://api.codey.lol/karma/" self.karma_retrieval_url: str = f"{self.karma_endpoints_base_url}get" self.karma_update_url: str = f"{self.karma_endpoints_base_url}modify" self.karma_top_10_url: str = f"{self.karma_endpoints_base_url}top" self.timers: dict = {} # discord uid : timestamp, used for rate limiting self.karma_cooldown: int = 15 # 15 seconds between karma updates async def get_karma(self, keyword: str) -> int: """ Get Karma for Keyword Args: keyword (str) Returns: int|str """ try: async with ClientSession() as session: async with await session.post(self.karma_retrieval_url, json={'keyword': keyword}, headers={ 'content-type': 'application/json; charset=utf-8', 'X-Authd-With': f'Bearer {constants.KARMA_API_KEY}', }, timeout=ClientTimeout(connect=3, sock_read=5)) as request: resp = await request.json() return resp.get('count') except Exception as e: traceback.print_exc() return False async def get_top(self, n: int = 10) -> Optional[dict]: """ Get top (n=10) Karma Args: n (int): Number of top results to return, default 10 Returns: Optional[dict] """ try: async with ClientSession() as session: async with await session.post(self.karma_top_10_url, json = { 'n': n, }, headers={ 'content-type': 'application/json; charset=utf-8', 'X-Authd-With': f'Bearer {constants.KARMA_API_KEY}' }, timeout=ClientTimeout(connect=3, sock_read=5)) as request: resp: dict = await request.json() return resp except: traceback.print_exc() return None async def get_top_embed(self, n:int = 10) -> Optional[discord.Embed]: """ Get Top Karma Embed Args: n (int): Number of top results to return, default 10 Returns: discord.Embed """ top: Optional[dict] = await self.get_top(n) if not top: return None top_formatted: str = "" for x, item in enumerate(top): top_formatted += f"{x+1}. **{discord.utils.escape_markdown(item[0])}**: *{item[1]}*\n" top_formatted = top_formatted.strip() embed: discord.Embed = discord.Embed(title=f"Top {n} Karma", description=top_formatted, colour=0xff00ff) return embed async def update_karma(self, display: str, _id: int, keyword: str, flag: int) -> bool: """ Update Karma for Keyword Args: display (str): Display name of the user who requested the update _id (int): Discord UID of the user who requested the update keyword (str): Keyword to update flag (int) """ if not flag in [0, 1]: return False reqObj: dict = { 'granter': f"Discord: {display} ({_id})", 'keyword': keyword, 'flag': flag, } try: async with ClientSession() as session: async with await session.post(self.karma_update_url, json=reqObj, headers={ 'content-type': 'application/json; charset=utf-8', 'X-Authd-With': f'Bearer {self.api_key}', }, timeout=ClientTimeout(connect=3, sock_read=5)) as request: result = await request.json() return result.get('success', False) except: traceback.print_exc() return False async def check_cooldown(self, user_id: int) -> bool: """ Check if member has met cooldown period prior to adjusting karma Args: user_id (int): The Discord UID to check Returns: bool """ if not user_id in self.timers: return True now = int(time.time()) if (now - self.timers[user_id]) < self.karma_cooldown: return False return True class Karma(commands.Cog): """Karma Cog for Havoc""" def __init__(self, bot: Havoc): importlib.reload(constants) self.bot: Havoc = bot self.util = Util(self.bot) # self.karma_regex = regex.compile(r'(\w+)(\+\+|\-\-)') self.karma_regex: Pattern = regex.compile(r'(\b\w+(?:\s+\w+)*)(\+\+($|\s)|\-\-($|\s))') self.mention_regex: Pattern = regex.compile(r'(<@([0-9]{17,20})>)(\+\+|\-\-)') self.mention_regex_no_flag: Pattern = regex.compile(r'(<@([0-9]{17,20})>+)') self.karma_chanid: int = 1307065684785893406 self.karma_msgid: int = 1325442184572567686 # asyncio.get_event_loop().create_task(self.bot.get_channel(self.karma_chanid).send(".")) try: self.update_karma_chan.start() except Exception as e: pass @tasks.loop(seconds=30, reconnect=True) async def update_karma_chan(self) -> None: """Update the Karma Chan Leaderboard""" try: top_embed = await self.util.get_top_embed(n=25) channel = self.bot.get_channel(self.karma_chanid) if not isinstance(channel, discord.TextChannel): return message_to_edit = await channel.fetch_message(self.karma_msgid) await message_to_edit.edit(embed=top_embed, content="## This message will automatically update periodically.") except: traceback.print_exc() @commands.Cog.listener() async def on_message(self, message: discord.Message) -> None: """ Message hook, to monitor for ++/-- Also monitors for messages to #karma to autodelete, only Havoc may post in #karma! """ if not self.bot.user: # No valid client instance return if not isinstance(message.channel, discord.TextChannel): return if message.channel.id == self.karma_chanid and not message.author.id == self.bot.user.id: """Message to #karma not by Havoc, delete it""" await message.delete(reason="Messages to #karma are not allowed") removal_embed: discord.Embed = discord.Embed( title="Message Deleted", description=f"Your message to **#{message.channel.name}** has been automatically deleted.\n**Reason**: Messages to this channel by users is not allowed." ) await message.author.send(embed=removal_embed) if message.author.id == self.bot.user.id: # Bots own message return if not message.guild: return if not message.guild.id in [1145182936002482196, 1228740575235149855]: # Not a valid guild for cmd return message_content: str = message.content.strip() mentions: list = regex.findall(self.mention_regex, message_content) for mention in mentions: try: logging.debug("Mention: %s", mention) mentioned_uid: int = int(mention[1]) friendly_flag: int = int(mention[2]) guild: Optional[discord.Guild] = self.bot.get_guild(message.guild.id) if not guild: return guild_member: Optional[discord.Member] = guild.get_member(mentioned_uid) if not guild_member: return display: str = guild_member.display_name message_content = message_content.replace(mention[0], display) logging.debug("New message: %s", message_content) except: traceback.print_exc() message_content = discord.utils.escape_markdown(message_content) karma_regex: list[str] = regex.findall(self.karma_regex, message_content.strip()) if not karma_regex: # Not a request to adjust karma return flooding: bool = not await self.util.check_cooldown(message.author.id) exempt_uids: list[int] = [1172340700663255091, 992437729927376996] if flooding and not message.author.id in exempt_uids: return await message.add_reaction(emoji="❗") processed_keywords_lc: list[str] = [] logging.debug("Matched: %s", karma_regex) for matched_keyword in karma_regex: if not isinstance(matched_keyword, tuple): continue if len(matched_keyword) == 4: (keyword, friendly_flag, _, __) = matched_keyword else: (keyword, friendly_flag) = matched_keyword now: int = int(time.time()) flag: int = None match friendly_flag: case "++": flag = 0 case "--": flag = 1 case _: logging.info("Unknown flag %s", flag) continue if keyword.lower() in processed_keywords_lc: continue processed_keywords_lc.append(keyword.lower()) self.util.timers[message.author.id] = now updated: bool = await self.util.update_karma(message.author.display_name, message.author.id, keyword, flag) if updated: return await message.add_reaction(emoji="👍") @bridge.bridge_command() async def karma(self, ctx, *, keyword: str | None = None) -> None: """With no arguments, top 10 karma is provided; a keyword can also be provided to lookup.""" try: if not keyword: top_10_embed: Optional[discord.Embed] = await self.util.get_top_embed() if not top_10_embed: return return await ctx.respond(embed=top_10_embed) keyword = discord.utils.escape_markdown(keyword) mentions: list[str] = regex.findall(self.mention_regex_no_flag, keyword) for mention in mentions: try: mentioned_uid = int(mention[1]) guild: Optional[discord.Guild] = self.bot.get_guild(ctx.guild.id) if not guild: return guild_member: Optional[discord.Member] = guild.get_member(mentioned_uid) if not guild_member: return display = guild_member.display_name keyword = keyword.replace(mention[0], display) except: traceback.print_exc() continue score: int = await self.util.get_karma(keyword) description: str = f"**{keyword}** has a karma of *{score}*" embed: discord.Embed = discord.Embed(title=f"Karma for {keyword}", description=description) return await ctx.respond(embed=embed) except Exception as e: await ctx.respond(f"Error: {str(e)}") traceback.print_exc() def cog_unload(self) -> None: try: self.update_karma_chan.cancel() except: """Safe to ignore""" pass def setup(bot) -> None: """Run on Cog Load""" bot.add_cog(Karma(bot))