From 6463ace40cde73230615f155603e9c04ea9270c0 Mon Sep 17 00:00:00 2001 From: codey Date: Sat, 15 Feb 2025 13:57:47 -0500 Subject: [PATCH] if recipient is None --- cogs/karma.py | 66 ++++-- cogs/lovehate.py | 102 ++++++--- cogs/misc.py | 542 +++++++++++++++++++++++++++----------------- cogs/misc_util.py | 223 +++++++++--------- cogs/quote.py | 321 -------------------------- cogs/radio.py | 37 ++- constructors.py | 9 + disc_havoc.py | 3 +- util/lovehate_db.py | 30 +-- 9 files changed, 617 insertions(+), 716 deletions(-) delete mode 100644 cogs/quote.py diff --git a/cogs/karma.py b/cogs/karma.py index f9a0d3a..a085415 100644 --- a/cogs/karma.py +++ b/cogs/karma.py @@ -9,6 +9,7 @@ import traceback import time import importlib import logging +from typing import Optional import discord import regex from regex import Pattern @@ -30,7 +31,7 @@ class Util: self.timers: dict = {} # discord uid : timestamp, used for rate limiting self.karma_cooldown: int = 15 # 15 seconds between karma updates - async def get_karma(self, keyword: str) -> int|str: + async def get_karma(self, keyword: str) -> int: """ Get Karma for Keyword Args: @@ -50,15 +51,15 @@ class Util: return resp.get('count') except Exception as e: traceback.print_exc() - return f"Failed-- {type(e).__name__}: {str(e)}" + return False - async def get_top(self, n: int = 10) -> dict: + async def get_top(self, n: int = 10) -> Optional[dict]: """ Get top (n=10) Karma Args: n (int): Number of top results to return, default 10 Returns: - dict + Optional[dict] """ try: async with ClientSession() as session: @@ -74,8 +75,9 @@ class Util: return resp except: traceback.print_exc() + return None - async def get_top_embed(self, n:int = 10) -> discord.Embed: + async def get_top_embed(self, n:int = 10) -> Optional[discord.Embed]: """ Get Top Karma Embed Args: @@ -83,11 +85,13 @@ class Util: Returns: discord.Embed """ - top: dict = await self.get_top(n) + top: Optional[dict] = await self.get_top(n) + if not top: + return None top_formatted: str = "" for x, item in enumerate(top): top_formatted += f"{x+1}. **{discord.utils.escape_markdown(item[0])}**: *{item[1]}*\n" - top_formatted: str = top_formatted.strip() + top_formatted = top_formatted.strip() embed: discord.Embed = discord.Embed(title=f"Top {n} Karma", description=top_formatted, colour=0xff00ff) @@ -103,7 +107,7 @@ class Util: flag (int) """ if not flag in [0, 1]: - return + return False reqObj: dict = { 'granter': f"Discord: {display} ({_id})", @@ -171,6 +175,8 @@ class Karma(commands.Cog): try: top_embed = await self.util.get_top_embed(n=25) channel = self.bot.get_channel(self.karma_chanid) + if not isinstance(channel, discord.TextChannel): + return message_to_edit = await channel.fetch_message(self.karma_msgid) await message_to_edit.edit(embed=top_embed, content="## This message will automatically update periodically.") @@ -184,7 +190,10 @@ class Karma(commands.Cog): Message hook, to monitor for ++/-- Also monitors for messages to #karma to autodelete, only Havoc may post in #karma! """ - + if not self.bot.user: # No valid client instance + return + if not isinstance(message.channel, discord.TextChannel): + return if message.channel.id == self.karma_chanid and not message.author.id == self.bot.user.id: """Message to #karma not by Havoc, delete it""" await message.delete(reason="Messages to #karma are not allowed") @@ -192,12 +201,13 @@ class Karma(commands.Cog): title="Message Deleted", description=f"Your message to **#{message.channel.name}** has been automatically deleted.\n**Reason**: Messages to this channel by users is not allowed." ) - return await message.author.send(embed=removal_embed) + await message.author.send(embed=removal_embed) if message.author.id == self.bot.user.id: # Bots own message return - + if not message.guild: + return if not message.guild.id in [1145182936002482196, 1228740575235149855]: # Not a valid guild for cmd return @@ -209,15 +219,19 @@ class Karma(commands.Cog): logging.debug("Mention: %s", mention) mentioned_uid: int = int(mention[1]) friendly_flag: int = int(mention[2]) - guild: discord.Guild = self.bot.get_guild(message.guild.id) - guild_member: discord.Member = guild.get_member(mentioned_uid) + guild: Optional[discord.Guild] = self.bot.get_guild(message.guild.id) + if not guild: + return + guild_member: Optional[discord.Member] = guild.get_member(mentioned_uid) + if not guild_member: + return display: str = guild_member.display_name - message_content: str = message_content.replace(mention[0], display) + message_content = message_content.replace(mention[0], display) logging.debug("New message: %s", message_content) except: traceback.print_exc() - message_content: str = discord.utils.escape_markdown(message_content) + message_content = discord.utils.escape_markdown(message_content) karma_regex: list[str] = regex.findall(self.karma_regex, message_content.strip()) if not karma_regex: # Not a request to adjust karma @@ -233,6 +247,8 @@ class Karma(commands.Cog): logging.debug("Matched: %s", karma_regex) for matched_keyword in karma_regex: + if not isinstance(matched_keyword, tuple): + continue if len(matched_keyword) == 4: (keyword, friendly_flag, _, __) = matched_keyword else: @@ -242,9 +258,9 @@ class Karma(commands.Cog): flag: int = None match friendly_flag: case "++": - flag: int = 0 + flag = 0 case "--": - flag: int = 1 + flag = 1 case _: logging.info("Unknown flag %s", flag) continue @@ -266,18 +282,24 @@ class Karma(commands.Cog): """With no arguments, top 10 karma is provided; a keyword can also be provided to lookup.""" try: if not keyword: - top_10_embed: discord.Embed = await self.util.get_top_embed() + top_10_embed: Optional[discord.Embed] = await self.util.get_top_embed() + if not top_10_embed: + return return await ctx.respond(embed=top_10_embed) - keyword: str = discord.utils.escape_markdown(keyword) + keyword = discord.utils.escape_markdown(keyword) mentions: list[str] = regex.findall(self.mention_regex_no_flag, keyword) for mention in mentions: try: mentioned_uid = int(mention[1]) - guild = self.bot.get_guild(ctx.guild.id) - guild_member = guild.get_member(mentioned_uid) + guild: Optional[discord.Guild] = self.bot.get_guild(ctx.guild.id) + if not guild: + return + guild_member: Optional[discord.Member] = guild.get_member(mentioned_uid) + if not guild_member: + return display = guild_member.display_name keyword = keyword.replace(mention[0], display) except: @@ -286,8 +308,6 @@ class Karma(commands.Cog): score: int = await self.util.get_karma(keyword) description: str = f"**{keyword}** has a karma of *{score}*" - if isinstance(score, dict) and score.get('err'): - description: str = f"*{score.get('errorText')}*" embed: discord.Embed = discord.Embed(title=f"Karma for {keyword}", description=description) return await ctx.respond(embed=embed) diff --git a/cogs/lovehate.py b/cogs/lovehate.py index 84105b1..fb4af68 100644 --- a/cogs/lovehate.py +++ b/cogs/lovehate.py @@ -4,7 +4,7 @@ import traceback import logging import os -from typing import Any, Optional +from typing import Any, Optional, Union import discord import aiosqlite as sqlite3 from discord.ext import bridge, commands @@ -43,11 +43,15 @@ class LoveHate(commands.Cog): """ try: if not user: - loves: list[tuple] = await self.db.get_lovehates(user=ctx.author.display_name, loves=True) + display_name = ctx.author.display_name + loves: Union[list[tuple], bool] = await self.db.get_lovehates(user=display_name, + loves=True) if not loves: return await ctx.respond("You don't seem to love anything...") out_loves: list = [] + if not isinstance(loves, list): + return for love in loves: (love,) = love out_loves.append(love) @@ -55,11 +59,11 @@ class LoveHate(commands.Cog): out_loves_str: str = self.join_with_and(out_loves) return await ctx.respond(f"{ctx.author.mention} loves {out_loves_str}") - loves: list[tuple] = await self.db.get_lovehates(user=user.strip(), loves=True) + loves = await self.db.get_lovehates(user=user.strip(), loves=True) if not loves: return await ctx.respond(f"{user} doesn't seem to love anything...") - out_loves_str: str = self.join_with_and(out_loves) + out_loves_str = self.join_with_and(out_loves) return await ctx.respond(f"{user} loves {out_loves_str}") except Exception as e: traceback.print_exc() @@ -77,15 +81,27 @@ class LoveHate(commands.Cog): """ try: if not thing: - thing: str = ctx.author.display_name - if discord.utils.raw_mentions(thing): + _thing: str = ctx.author.display_name + else: + _thing = thing + if discord.utils.raw_mentions(_thing): # There are mentions - thing_id: int = discord.utils.raw_mentions(thing)[0] # First mention - thing: str = self.bot.get_guild(ctx.guild.id).get_member(thing_id).display_name + thing_id: int = discord.utils.raw_mentions(_thing)[0] # First mention + guild: Optional[discord.Guild] = self.bot.get_guild(ctx.guild.id) + if not guild: + return + thing_member: Optional[discord.Member] = guild.get_member(thing_id) + if not thing_member: + return + + _thing = thing_member.display_name - who_loves: list[tuple] = await self.db.get_wholovehates(thing=thing, + if not _thing: + return + + who_loves: Union[list, bool] = await self.db.get_wholovehates(thing=_thing, loves=True) - if not who_loves: + if not isinstance(who_loves, list): return await ctx.respond(f"I couldn't find anyone who loves {thing}...") out_wholoves: list = [] @@ -114,18 +130,28 @@ class LoveHate(commands.Cog): """ try: if not thing: - thing: str = ctx.author.display_name - if discord.utils.raw_mentions(thing): + _thing: str = ctx.author.display_name + else: + _thing = thing + if discord.utils.raw_mentions(_thing): # There are mentions - thing_id: int = discord.utils.raw_mentions(thing)[0] # First mention - thing: str = self.bot.get_guild(ctx.guild.id).get_member(thing_id).display_name + guild: Optional[discord.Guild] = self.bot.get_guild(ctx.guild.id) + if not guild: + return + thing_id: int = discord.utils.raw_mentions(_thing)[0] # First mention + thing_member: Optional[discord.Member] = guild.get_member(thing_id) + if not thing_member: + return + _thing = thing_member.display_name - who_hates: list[tuple] = await self.db.get_wholovehates(thing=thing, + who_hates: Union[list[tuple], bool] = await self.db.get_wholovehates(thing=_thing, hates=True) if not who_hates: return await ctx.respond(f"I couldn't find anyone who hates {thing}...") out_whohates: list = [] + if not isinstance(who_hates, list): + return for hater in who_hates: (hater,) = hater out_whohates.append(str(hater)) @@ -170,26 +196,24 @@ class LoveHate(commands.Cog): """ try: if not user: - hates: list[tuple] = await self.db.get_lovehates(user=ctx.author.display_name, + display_name = ctx.author.display_name + hates: Union[list[tuple], bool] = await self.db.get_lovehates(user=display_name, hates=True) if not hates: return await ctx.respond("You don't seem to hate anything...") - - out_hates: list = [] - for hated_thing in hates: - (hated_thing,) = hated_thing - out_hates.append(str(hated_thing)) - - out_hates_str: str = self.join_with_and(out_hates) - - return await ctx.respond(f"{ctx.author.mention} hates {out_hates_str}") - - hates: list[tuple] = await self.db.get_lovehates(user=user.strip(), hates=True) - if not hates: - return await ctx.respond(f"{user} doesn't seem to hate anything...") + else: + hates = await self.db.get_lovehates(user=user.strip(), hates=True) + if not hates: + return await ctx.respond(f"{user} doesn't seem to hate anything...") - out_hates_str: str = self.join_with_and(hates) - + out_hates: list = [] + if not isinstance(hates, list): + return + for hated_thing in hates: + (hated_thing,) = hated_thing + out_hates.append(str(hated_thing)) + + out_hates_str: str = self.join_with_and(out_hates) return await ctx.respond(f"{user} hates {out_hates_str}") except Exception as e: await ctx.respond(f"Error: {str(e)}") @@ -209,7 +233,13 @@ class LoveHate(commands.Cog): if discord.utils.raw_mentions(thing): # There are mentions thing_id: int = discord.utils.raw_mentions(thing)[0] # First mention - thing: str = self.bot.get_guild(ctx.guild.id).get_member(thing_id).display_name + guild: Optional[discord.Guild] = self.bot.get_guild(ctx.guild) + if not guild: + return + thing_member: Optional[discord.Member] = guild.get_member(thing_id) + if not thing_member: + return + thing = thing_member.display_name love: str = await self.db.update(ctx.author.display_name, thing, 1) @@ -232,7 +262,13 @@ class LoveHate(commands.Cog): if discord.utils.raw_mentions(thing): # There are mentions thing_id: int = discord.utils.raw_mentions(thing)[0] # First mention - thing: str = self.bot.get_guild(ctx.guild.id).get_member(thing_id).display_name + guild: Optional[discord.Guild] = self.bot.get_guild(ctx.guild.id) + if not guild: + return + thing_member: Optional[discord.Member] = guild.get_member(thing_id) + if not thing_member: + return + thing = thing_member.display_name hate: str = await self.db.update(ctx.author.display_name, thing, -1) return await ctx.respond(hate) diff --git a/cogs/misc.py b/cogs/misc.py index b9feeaa..cb4b439 100644 --- a/cogs/misc.py +++ b/cogs/misc.py @@ -8,11 +8,12 @@ import random from typing import Optional, LiteralString import logging import discord -from .misc_util import Util +from cogs.misc_util import Util import aiosqlite as sqlite3 from sh import cowsay as cow_say, fortune # pylint: disable=no-name-in-module from discord.ext import bridge, commands, tasks from disc_havoc import Havoc +from constructors import MiscException # pylint: disable=bare-except, broad-exception-caught, broad-exception-raised, global-statement # pylint: disable=too-many-lines, invalid-name @@ -64,7 +65,7 @@ class Misc(commands.Cog): global BOT_CHANIDS BOT_CHANIDS = self.bot.BOT_CHANIDS - def is_spamchan() -> bool: # pylint: disable=no-method-argument + def is_spamchan() -> bool: # type: ignore """Check if channel is spamchan""" def predicate(ctx): try: @@ -74,9 +75,9 @@ class Misc(commands.Cog): except: traceback.print_exc() return False - return commands.check(predicate) + return commands.check(predicate) # type: ignore - def is_spamchan_or_drugs() -> bool: # pylint: disable=no-method-argument + def is_spamchan_or_drugs() -> bool: # type: ignore """Check if channel is spamchan or drugs chan""" def predicate(ctx): try: @@ -86,10 +87,10 @@ class Misc(commands.Cog): except: traceback.print_exc() return False - return commands.check(predicate) + return commands.check(predicate) # type: ignore - async def get_random_guild_member(self, online_only: Optional[bool] = False) -> str: + async def get_random_guild_member(self, online_only: Optional[bool] = False) -> Optional[str]: """ Get Random Guild Member Args: @@ -97,7 +98,9 @@ class Misc(commands.Cog): Returns: str """ - guild = self.bot.get_guild(1145182936002482196) + guild: Optional[discord.Guild] = self.bot.get_guild(1145182936002482196) + if not guild: + return None if not online_only: guild_members = [str(member.display_name) for member in guild.members if not member.bot] else: @@ -115,14 +118,16 @@ class Misc(commands.Cog): None """ try: - stats_embed: discord.Embed = await self.util.get_stats_embed() + stats_embed: Optional[discord.Embed] = await self.util.get_stats_embed() + if not stats_embed: + return return await ctx.respond(embed=stats_embed) except Exception as e: traceback.print_exc() return await ctx.respond(f"Error: {str(e)}") - @bridge.bridge_command() - @is_spamchan_or_drugs() # pylint: disable=too-many-function-args + @bridge.bridge_command() # type: ignore + @is_spamchan_or_drugs() async def listcoffees(self, ctx) -> None: """ List Available Coffees @@ -142,8 +147,8 @@ class Misc(commands.Cog): traceback.print_exc() return await ctx.respond(f"Error: {str(e)}") - @bridge.bridge_command() - @is_spamchan_or_drugs() # pylint: disable=too-many-function-args + @bridge.bridge_command() # type: ignore + @is_spamchan_or_drugs() async def listshoves(self, ctx) -> None: """ List Available Fates for shove command @@ -187,7 +192,10 @@ class Misc(commands.Cog): '9': '9️⃣', } with ctx.channel.typing(): - (days, hours, minutes, seconds, ms, us) = self.util.get_days_to_xmas() # pylint: disable=unused-variable + countdown = self.util.get_days_to_xmas() + if not isinstance(countdown, tuple) or len(countdown) < 6: + return await ctx.respond("Oops, Christmas is cancelled.") # Invalid countdown from util + (days, hours, minutes, seconds, ms, _) = countdown now: datetime.datetime = datetime.datetime.now() if now.month == 12 and now.day == 25: return await ctx.respond("# IT IS CHRISTMAS!!!!!!!!\n-# keep the change, you filthy animal") @@ -235,19 +243,24 @@ class Misc(commands.Cog): None """ try: + guild: Optional[discord.Guild] = self.bot.get_guild(ctx.guild.id) + if not guild: + return authorDisplay: str = ctx.author.display_name if not(ctx.author.display_name is None)\ else ctx.message.author.display_name - if recipient is None: - recipient: str = authorDisplay.strip() + if not recipient: + recipient = authorDisplay.strip() else: if discord.utils.raw_mentions(recipient): # There are mentions recipient_id: int = discord.utils.raw_mentions(recipient)[0] # First mention - recipient: str = self.bot.get_guild(ctx.guild.id)\ - .get_member(recipient_id).display_name + recipient_member: Optional[discord.Member] = guild.get_member(recipient_id) + if not recipient_member: + return + recipient = recipient_member.display_name else: - recipient: str = discord.utils.escape_mentions(recipient.strip()) + recipient = discord.utils.escape_mentions(recipient.strip()) with ctx.channel.typing(): insult: str = await self.util.get_insult(recipient) if insult: @@ -272,16 +285,21 @@ class Misc(commands.Cog): authorDisplay: str = ctx.author.display_name if not(ctx.author.display_name is None)\ else ctx.message.author.display_name - if recipient is None: - recipient: str = authorDisplay.strip() + if not recipient: + recipient = authorDisplay.strip() else: if discord.utils.raw_mentions(recipient): # There are mentions recipient_id: int = discord.utils.raw_mentions(recipient)[0] # First mention - recipient: str = self.bot.get_guild(ctx.guild.id)\ - .get_member(recipient_id).display_name + guild: Optional[discord.Guild] = self.bot.get_guild(ctx.guild.id) + if not guild: + return + recipient_member: Optional[discord.Member] = guild.get_member(recipient_id) + if not recipient_member: + return + recipient = recipient_member.display_name else: - recipient: str = discord.utils.escape_mentions(recipient.strip()) + recipient = discord.utils.escape_mentions(recipient.strip()) with ctx.channel.typing(): compliment: str = await self.util.get_compliment(recipient) if compliment: @@ -304,17 +322,25 @@ class Misc(commands.Cog): """ try: if not recipient: - recipient: str = ctx.author.display_name + recipient = ctx.author.display_name else: if discord.utils.raw_mentions(recipient): # There are mentions - recipient_id: int = discord.utils.raw_mentions(recipient)[0] # First mention - recipient: str = self.bot.get_guild(ctx.guild.id)\ - .get_member(recipient_id).display_name + recipient_id = discord.utils.raw_mentions(recipient)[0] # First mention + guild: Optional[discord.Guild] = self.bot.get_guild(ctx.guild.id) + if not guild: + return + recipient_member: Optional[discord.Member] = guild.get_member(recipient_id) + if not recipient_member: + return + recipient = recipient_member.display_name else: - recipient: str = discord.utils.escape_mentions(recipient.strip()) + recipient = discord.utils.escape_mentions(recipient.strip()) - (choice_name, choice_category, choice_description) = await self.util.get_whisky() + whisky: Optional[tuple] = await self.util.get_whisky() + if not whisky: + raise MiscException("Failed to get whisky from db") + (choice_name, choice_category, choice_description) = whisky embed: discord.Embed = discord.Embed(title=f"Whisky for {recipient}: {choice_name}", description=choice_description.strip()) embed.add_field(name="Category", value=choice_category, inline=True) @@ -339,17 +365,27 @@ class Misc(commands.Cog): """ try: if not recipient: - recipient: str = ctx.author.display_name + recipient = ctx.author.display_name else: if discord.utils.raw_mentions(recipient): # There are mentions - recipient_id: int = discord.utils.raw_mentions(recipient)[0] # First mention - recipient: str = self.bot.get_guild(ctx.guild.id)\ - .get_member(recipient_id).display_name + recipient_id = discord.utils.raw_mentions(recipient)[0] # First mention + guild: Optional[discord.Guild] = self.bot.get_guild(ctx.guild.id) + if not guild: + return + recipient_member: Optional[discord.Member] = guild.get_member(recipient_id) + if not recipient_member: + return + recipient = recipient_member.display_name else: - recipient: str = discord.utils.escape_mentions(recipient.strip()) - - (choice_name, choice_ingredients) = await self.util.get_drink() + recipient = discord.utils.escape_mentions(recipient.strip()) + + if not recipient: + return + drink: Optional[tuple] = await self.util.get_drink() + if not drink: + raise MiscException("Failed to get drink from db.") + (choice_name, choice_ingredients) = drink await ctx.respond(f"*is mixing up **{choice_name}** for {recipient.strip()}*") embed: discord.Embed = discord.Embed(title=f"Cocktail for {recipient}", description=choice_name) @@ -377,20 +413,24 @@ class Misc(commands.Cog): authorDisplay: str = ctx.author.display_name if not(ctx.author.display_name is None)\ else ctx.message.author.display_name - if recipient is None: - recipient: str = authorDisplay.strip() + if not recipient: + recipient = authorDisplay.strip() recipient_normal: str = ctx.author.mention else: - recipient_normal: str = recipient + recipient_normal = recipient if discord.utils.raw_mentions(recipient): # There are mentions recipient_id: int = discord.utils.raw_mentions(recipient)[0] # First mention - recipient: str = self.bot.get_guild(ctx.guild.id)\ - .get_member(recipient_id).display_name - recipient_normal: str = self.bot.get_guild(ctx.guild.id)\ - .get_member(recipient_id).mention + guild: Optional[discord.Guild] = self.bot.get_guild(ctx.guild.id) + if not guild: + return + recipient_member: Optional[discord.Member] = guild.get_member(recipient_id) + if not recipient_member: + return + recipient = recipient_member.display_name + recipient_normal = recipient_member.mention else: - recipient: str = discord.utils.escape_mentions(recipient.strip()) + recipient = discord.utils.escape_mentions(recipient.strip()) try: await ctx.respond(f"*sprays **{recipient_normal}** with water*") await self.util.increment_counter("water_sprays") @@ -412,20 +452,24 @@ class Misc(commands.Cog): authorDisplay: str = ctx.author.display_name if not(ctx.author.display_name is None)\ else ctx.message.author.display_name - if recipient is None: - recipient: str = authorDisplay.strip() - recipient_normal: str = ctx.author.mention + if not recipient: + recipient = authorDisplay.strip() + recipient_normal = ctx.author.mention else: - recipient_normal: str = recipient + recipient_normal = recipient if discord.utils.raw_mentions(recipient): # There are mentions recipient_id: int = discord.utils.raw_mentions(recipient)[0] # First mention - recipient: str = self.bot.get_guild(ctx.guild.id)\ - .get_member(recipient_id).display_name - recipient_normal: str = self.bot.get_guild(ctx.guild.id)\ - .get_member(recipient_id).mention + guild: Optional[discord.Guild] = self.bot.get_guild(ctx.guild.id) + if not guild: + return + recipient_member: Optional[discord.Member] = guild.get_member(recipient_id) + if not recipient_member: + return + recipient = recipient_member.display_name + recipient_normal = recipient_member.mention else: - recipient: str = discord.utils.escape_mentions(recipient.strip()) + recipient = discord.utils.escape_mentions(recipient.strip()) try: await ctx.respond(f"*passes **{recipient_normal}** a barf bag*") await self.util.increment_counter("barf_bags") @@ -448,30 +492,34 @@ class Misc(commands.Cog): authorDisplay: str = ctx.author.display_name if not(ctx.author.display_name is None)\ else ctx.message.author.display_name - if recipient is None: - recipient: str = authorDisplay.strip() + if not recipient: + recipient = authorDisplay.strip() recipient_normal: str = ctx.author.mention else: - recipient_normal: str = recipient + recipient_normal = recipient if discord.utils.raw_mentions(recipient): # There are mentions recipient_id: int = discord.utils.raw_mentions(recipient)[0] # First mention - recipient: str = self.bot.get_guild(ctx.guild.id)\ - .get_member(recipient_id).display_name - recipient_normal: str = self.bot.get_guild(ctx.guild.id)\ - .get_member(recipient_id).mention + guild: Optional[discord.Guild] = self.bot.get_guild(ctx.guild.id) + if not guild: + return + recipient_member: Optional[discord.Member] = guild.get_member(recipient_id) + if not recipient_member: + return + recipient = recipient_member.display_name + recipient_normal = recipient_member.mention else: - recipient: str = discord.utils.escape_mentions(recipient.strip()) + recipient = discord.utils.escape_mentions(recipient.strip()) try: if recipient == "rhodes": - tea: str = "a cup of Harney & Sons Hot Cinnamon Spice Tea" + tea = "a cup of Harney & Sons Hot Cinnamon Spice Tea" elif ctx.author.id == 992437729927376996 or recipient.lower() in ["kriegerin", "traurigkeit", "krieg", "kriegs", "cyberkrieg", "ck"]: - tea: str = "a cup of earl grey, light and sweet" + tea = "a cup of earl grey, light and sweet" response = await ctx.respond(f"*hands **{recipient_normal}** {tea}*") await self.util.increment_counter("teas") try: @@ -483,8 +531,8 @@ class Misc(commands.Cog): traceback.print_exc() return await ctx.respond(f"Failed: {str(e)}") - @bridge.bridge_command() - @is_spamchan_or_drugs() # pylint: disable=too-many-function-args + @bridge.bridge_command() # type: ignore + @is_spamchan_or_drugs() async def cowsay(self, ctx, *, message: str) -> None: """ @@ -505,8 +553,8 @@ class Misc(commands.Cog): return await ctx.respond(f"Failed: {str(e)}") - @bridge.bridge_command() - @is_spamchan_or_drugs() # pylint: disable=too-many-function-args + @bridge.bridge_command() # type: ignore + @is_spamchan_or_drugs() async def fortune(self, ctx, cowfile: Optional[str] = None) -> None: """ @@ -518,7 +566,7 @@ class Misc(commands.Cog): """ try: if not cowfile: - cowfile: str = random.choice(self.COWS).replace(".cow", "") + cowfile = random.choice(self.COWS).replace(".cow", "") if not f'{cowfile}.cow' in self.COWS: return await ctx.respond(f"Unknown cow {cowfile}, who dat?") @@ -531,8 +579,8 @@ class Misc(commands.Cog): traceback.print_exc() return await ctx.respond(f"Failed: {str(e)}") - @bridge.bridge_command() - @is_spamchan_or_drugs() # pylint: disable=too-many-function-args + @bridge.bridge_command() # type: ignore + @is_spamchan_or_drugs() async def listcows(self, ctx) -> None: """ List available .cow files (for cowsay) @@ -544,7 +592,7 @@ class Misc(commands.Cog): cow_list: str = "" try: for cow in self.COWS: - cow: str = cow.replace(".cow", "") + cow = cow.replace(".cow", "") cow_list += f"- **{cow}**\n" embed: discord.Embed = discord.Embed(title="List of .cows", @@ -568,20 +616,24 @@ class Misc(commands.Cog): authorDisplay: str = ctx.author.display_name if not(ctx.author.display_name is None)\ else ctx.message.author.display_name - if recipient is None: - recipient: str = authorDisplay.strip() + if not recipient: + recipient = authorDisplay.strip() recipient_normal: str = ctx.author.mention else: - recipient_normal: str = recipient + recipient_normal = recipient if discord.utils.raw_mentions(recipient): # There are mentions recipient_id: int = discord.utils.raw_mentions(recipient)[0] # First mention - recipient: str = self.bot.get_guild(ctx.guild.id)\ - .get_member(recipient_id).display_name - recipient_normal: str = self.bot.get_guild(ctx.guild.id)\ - .get_member(recipient_id).mention + guild: Optional[discord.Guild] = self.bot.get_guild(ctx.guild.id) + if not guild: + return + recipient_member: Optional[discord.Member] = guild.get_member(recipient_id) + if not recipient_member: + return + recipient = recipient_member.display_name + recipient_normal = recipient_member.mention else: - recipient: str = discord.utils.escape_mentions(recipient.strip()) + recipient = discord.utils.escape_mentions(recipient.strip()) try: response = await ctx.respond(f"*doses **{recipient_normal}** with Zyklon-B*") await self.util.increment_counter("cyanides") @@ -591,7 +643,7 @@ class Misc(commands.Cog): except Exception as e: logging.debug("Failed to add cynaide reaction: %s", str(e)) - except: + except Exception as e: traceback.print_exc() return await ctx.respond(f"Failed: {str(e)}") @@ -609,20 +661,24 @@ class Misc(commands.Cog): authorDisplay: str = ctx.author.display_name if not(ctx.author.display_name is None)\ else ctx.message.author.display_name - if recipient is None: - recipient: str = authorDisplay.strip() + if not recipient: + recipient = authorDisplay.strip() recipient_normal: str = ctx.author.mention else: - recipient_normal: str = recipient + recipient_normal = recipient if discord.utils.raw_mentions(recipient): # There are mentions recipient_id: int = discord.utils.raw_mentions(recipient)[0] # First mention - recipient: str = self.bot.get_guild(ctx.guild.id)\ - .get_member(recipient_id).display_name - recipient_normal: str = self.bot.get_guild(ctx.guild.id)\ - .get_member(recipient_id).mention + guild: Optional[discord.Guild] = self.bot.get_guild(ctx.guild.id) + if not guild: + return + recipient_member: Optional[discord.Member] = guild.get_member(recipient_id) + if not recipient_member: + return + recipient = recipient_member.display_name + recipient_normal = recipient_member.mention else: - recipient: str = discord.utils.escape_mentions(recipient.strip()) + recipient = discord.utils.escape_mentions(recipient.strip()) try: response = await ctx.respond(f"*doses **{recipient_normal}** with school gravy*") await self.util.increment_counter("gravies") @@ -650,20 +706,24 @@ class Misc(commands.Cog): authorDisplay: str = ctx.author.display_name if not(ctx.author.display_name is None)\ else ctx.message.author.display_name - if recipient is None: - recipient: str = authorDisplay.strip() + if not recipient: + recipient = authorDisplay.strip() recipient_normal: str = ctx.author.mention else: - recipient_normal: str = recipient + recipient_normal = recipient if discord.utils.raw_mentions(recipient): # There are mentions recipient_id: int = discord.utils.raw_mentions(recipient)[0] # First mention - recipient: str = self.bot.get_guild(ctx.guild.id)\ - .get_member(recipient_id).display_name - recipient_normal: str = self.bot.get_guild(ctx.guild.id)\ - .get_member(recipient_id).mention + guild: Optional[discord.Guild] = self.bot.get_guild(ctx.guild.id) + if not guild: + return + recipient_member: Optional[discord.Member] = guild.get_member(recipient_id) + if not recipient_member: + return + recipient = recipient_member.display_name + recipient_normal = recipient_member.mention else: - recipient: str = discord.utils.escape_mentions(recipient.strip()) + recipient = discord.utils.escape_mentions(recipient.strip()) try: response = await ctx.respond(f"*hands **{recipient_normal}** a cold glass of water*") await self.util.increment_counter("waters") @@ -692,20 +752,24 @@ class Misc(commands.Cog): authorDisplay: str = ctx.author.display_name if not(ctx.author.display_name is None)\ else ctx.message.author.display_name - if recipient is None: - recipient: str = authorDisplay.strip() + if not recipient: + recipient = authorDisplay.strip() recipient_normal: str = ctx.author.mention else: - recipient_normal: str = recipient + recipient_normal = recipient if discord.utils.raw_mentions(recipient): # There are mentions recipient_id: int = discord.utils.raw_mentions(recipient)[0] # First mention - recipient: str = self.bot.get_guild(ctx.guild.id)\ - .get_member(recipient_id).display_name - recipient_normal: str = self.bot.get_guild(ctx.guild.id)\ - .get_member(recipient_id).mention + guild: Optional[discord.Guild] = self.bot.get_guild(ctx.guild.id) + if not guild: + return + recipient_member: Optional[discord.Member] = guild.get_member(recipient_id) + if not recipient_member: + return + recipient = recipient_member.display_name + recipient_normal = recipient_member.mention else: - recipient: str = discord.utils.escape_mentions(recipient.strip()) + recipient = discord.utils.escape_mentions(recipient.strip()) try: await ctx.respond(f"*shoves **{recipient_normal}** {chosen_fate}*") await self.util.increment_counter("shoves") @@ -727,22 +791,28 @@ class Misc(commands.Cog): authorDisplay: str = ctx.author.display_name if not(ctx.author.display_name is None)\ else ctx.message.author.display_name - if recipient is None: - recipient: str = authorDisplay.strip() + if not recipient: + recipient = authorDisplay.strip() recipient_normal: str = ctx.author.mention else: - recipient_normal: str = recipient + recipient_normal = recipient if discord.utils.raw_mentions(recipient): # There are mentions recipient_id: int = discord.utils.raw_mentions(recipient)[0] # First mention - recipient: str = self.bot.get_guild(ctx.guild.id)\ - .get_member(recipient_id).display_name - recipient_normal: str = self.bot.get_guild(ctx.guild.id)\ - .get_member(recipient_id).mention + guild: Optional[discord.Guild] = self.bot.get_guild(ctx.guild.id) + if not guild: + return + recipient_member: Optional[discord.Member] = guild.get_member(recipient_id) + if not recipient_member: + return + recipient = recipient_member.display_name + recipient_normal = recipient_member.mention else: - recipient: str = discord.utils.escape_mentions(recipient.strip()) + recipient = discord.utils.escape_mentions(recipient.strip()) try: - chosen_coffee: str = self.util.get_coffee() + chosen_coffee: Optional[str] = self.util.get_coffee() + if not chosen_coffee: + return response = await ctx.respond(f"*hands **{recipient_normal}** {chosen_coffee}*") await self.util.increment_counter("coffees") try: @@ -768,24 +838,31 @@ class Misc(commands.Cog): authorDisplay: str = ctx.author.display_name if not(ctx.author.display_name is None)\ else ctx.message.author.display_name - if recipient is None: - recipient: str = authorDisplay.strip() + if not recipient: + recipient = authorDisplay.strip() else: if discord.utils.raw_mentions(recipient): # There are mentions recipient_id: int = discord.utils.raw_mentions(recipient)[0] # First mention - recipient: str = self.bot.get_guild(ctx.guild.id)\ - .get_member(recipient_id).display_name + guild: Optional[discord.Guild] = self.bot.get_guild(ctx.guild.id) + if not guild: + return + recipient_member: Optional[discord.Member] = guild.get_member(recipient_id) + if not recipient_member: + return + recipient = recipient_member.display_name else: - recipient: str = discord.utils.escape_mentions(recipient.strip()) + recipient = discord.utils.escape_mentions(recipient.strip()) try: - chosen_cookie: dict = await self.util.get_cookie() + chosen_cookie: Optional[dict] = await self.util.get_cookie() + if not chosen_cookie: + raise MiscException("Failed to get cookie from db.") embed: discord.Embed = discord.Embed(title=f"Cookie for {recipient}", description=f"Have a {chosen_cookie.get('name')}", colour=discord.Colour.orange(), image=chosen_cookie.get('image_url')) embed.add_field(name="Origin", - value=chosen_cookie.get('origin')) + value=chosen_cookie.get('origin', 'N/A')) await ctx.respond(embed=embed) except Exception as e: traceback.print_exc() @@ -803,20 +880,24 @@ class Misc(commands.Cog): authorDisplay: str = ctx.author.display_name if not(ctx.author.display_name is None)\ else ctx.message.author.display_name - if recipient is None: - recipient: str = authorDisplay.strip() + if not recipient: + recipient = authorDisplay.strip() recipient_normal: str = ctx.author.mention else: - recipient_normal: str = recipient + recipient_normal = recipient if discord.utils.raw_mentions(recipient): # There are mentions recipient_id: int = discord.utils.raw_mentions(recipient)[0] # First mention - recipient: str = self.bot.get_guild(ctx.guild.id)\ - .get_member(recipient_id).display_name - recipient_normal: str = self.bot.get_guild(ctx.guild.id)\ - .get_member(recipient_id).mention + guild: Optional[discord.Guild] = self.bot.get_guild(ctx.guild.id) + if not guild: + return + recipient_member: Optional[discord.Member] = guild.get_member(recipient_id) + if not recipient_member: + return + recipient = recipient_member.display_name + recipient_normal = recipient_member.mention else: - recipient: str = discord.utils.escape_mentions(recipient.strip()) + recipient = discord.utils.escape_mentions(recipient.strip()) try: await ctx.respond(f"*hands **{recipient_normal}** 2 warm hashbrowns*") await self.util.increment_counter("hashbrowns") @@ -838,20 +919,24 @@ class Misc(commands.Cog): authorDisplay = ctx.author.display_name if not(ctx.author.display_name is None)\ else ctx.message.author.display_name - if recipient is None: - recipient: str = authorDisplay.strip() + if not recipient: + recipient = authorDisplay.strip() recipient_normal: str = ctx.author.mention else: - recipient_normal: str = recipient + recipient_normal = recipient if discord.utils.raw_mentions(recipient): # There are mentions recipient_id: int = discord.utils.raw_mentions(recipient)[0] # First mention - recipient: str = self.bot.get_guild(ctx.guild.id)\ - .get_member(recipient_id).display_name - recipient_normal: str = self.bot.get_guild(ctx.guild.id)\ - .get_member(recipient_id).mention + guild: Optional[discord.Guild] = self.bot.get_guild(ctx.guild.id) + if not guild: + return + recipient_member: Optional[discord.Member] = guild.get_member(recipient_id) + if not recipient_member: + return + recipient = recipient_member.display_name + recipient_normal = recipient_member.mention else: - recipient: str = discord.utils.escape_mentions(recipient.strip()) + recipient = discord.utils.escape_mentions(recipient.strip()) try: response = await ctx.respond(f"*serves **{recipient_normal}** a plate of ritalini* 😉") await response.add_reaction(emoji="💊") @@ -875,20 +960,24 @@ class Misc(commands.Cog): authorDisplay: str = ctx.author.display_name if not(ctx.author.display_name is None)\ else ctx.message.author.display_name - if recipient is None: - recipient: str = authorDisplay.strip() + if not recipient: + recipient = authorDisplay.strip() recipient_normal: str = ctx.author.mention else: - recipient_normal: str = recipient + recipient_normal = recipient if discord.utils.raw_mentions(recipient): # There are mentions recipient_id: int = discord.utils.raw_mentions(recipient)[0] # First mention - recipient: str = self.bot.get_guild(ctx.guild.id)\ - .get_member(recipient_id).display_name - recipient_normal: str = self.bot.get_guild(ctx.guild.id)\ - .get_member(recipient_id).mention + guild: Optional[discord.Guild] = self.bot.get_guild(ctx.guild.id) + if not guild: + return + recipient_member: Optional[discord.Member] = guild.get_member(recipient_id) + if not recipient_member: + return + recipient = recipient_member.display_name + recipient_normal = recipient_member.mention else: - recipient: str = discord.utils.escape_mentions(recipient.strip()) + recipient = discord.utils.escape_mentions(recipient.strip()) try: await ctx.respond(f"*hands **{recipient_normal}** a grilled cheese*") await self.util.increment_counter("grilled_cheeses") @@ -910,20 +999,24 @@ class Misc(commands.Cog): authorDisplay: str = ctx.author.display_name if not(ctx.author.display_name is None)\ else ctx.message.author.display_name - if recipient is None: - recipient: str = authorDisplay.strip() + if not recipient: + recipient = authorDisplay.strip() recipient_normal: str = ctx.author.mention else: - recipient_normal: str = recipient + recipient_normal = recipient if discord.utils.raw_mentions(recipient): # There are mentions recipient_id: int = discord.utils.raw_mentions(recipient)[0] # First mention - recipient: str = self.bot.get_guild(ctx.guild.id)\ - .get_member(recipient_id).display_name - recipient_normal: str = self.bot.get_guild(ctx.guild.id)\ - .get_member(recipient_id).mention + guild: Optional[discord.Guild] = self.bot.get_guild(ctx.guild.id) + if not guild: + return + recipient_member: Optional[discord.Member] = guild.get_member(recipient_id) + if not recipient_member: + return + recipient = recipient_member.display_name + recipient_normal = recipient_member.mention else: - recipient: str = discord.utils.escape_mentions(recipient.strip()) + recipient = discord.utils.escape_mentions(recipient.strip()) try: await ctx.respond(f"*hands **{recipient_normal}** a hot bowl of soup*") await self.util.increment_counter("soups") @@ -945,18 +1038,22 @@ class Misc(commands.Cog): authorDisplay: str = ctx.author.display_name if not(ctx.author.display_name is None)\ else ctx.message.author.display_name - if recipient is None: - recipient: str = authorDisplay.strip() + if not recipient: + recipient = authorDisplay.strip() recipient_normal: str = ctx.author.mention else: - recipient_normal: str = recipient + recipient_normal = recipient if discord.utils.raw_mentions(recipient): # There are mentions recipient_id: int = discord.utils.raw_mentions(recipient)[0] # First mention - recipient: str = self.bot.get_guild(ctx.guild.id)\ - .get_member(recipient_id).display_name - recipient_normal: str = self.bot.get_guild(ctx.guild.id)\ - .get_member(recipient_id).mention + guild: Optional[discord.Guild] = self.bot.get_guild(ctx.guild.id) + if not guild: + return + recipient_member: Optional[discord.Member] = guild.get_member(recipient_id) + if not recipient_member: + return + recipient = recipient_member.display_name + recipient_normal = recipient_member.mention else: recipient = discord.utils.escape_mentions(recipient.strip()) try: @@ -981,20 +1078,24 @@ class Misc(commands.Cog): authorDisplay: str = ctx.author.display_name if not(ctx.author.display_name is None)\ else ctx.message.author.display_name - if recipient is None: - recipient: str = authorDisplay.strip() + if not recipient: + recipient = authorDisplay.strip() recipient_normal: str = ctx.author.mention else: - recipient_normal: str = recipient + recipient_normal = recipient if discord.utils.raw_mentions(recipient): # There are mentions recipient_id: int = discord.utils.raw_mentions(recipient)[0] # First mention - recipient: str = self.bot.get_guild(ctx.guild.id)\ - .get_member(recipient_id).display_name - recipient_normal: str = self.bot.get_guild(ctx.guild.id)\ - .get_member(recipient_id).mention + guild: Optional[discord.Guild] = self.bot.get_guild(ctx.guild.id) + if not guild: + return + recipient_member: Optional[discord.Member] = guild.get_member(recipient_id) + if not recipient_member: + return + recipient = recipient_member.display_name + recipient_normal = recipient_member.mention else: - recipient: str = discord.utils.escape_mentions(recipient.strip()) + recipient = discord.utils.escape_mentions(recipient.strip()) try: response = await ctx.respond(f"*hands **{recipient_normal}** a side of bacon*") await response.add_reaction(emoji="🥓") @@ -1017,20 +1118,24 @@ class Misc(commands.Cog): authorDisplay = ctx.author.display_name if not(ctx.author.display_name is None)\ else ctx.message.author.display_name - if recipient is None: - recipient: str = authorDisplay.strip() + if not recipient: + recipient = authorDisplay.strip() recipient_normal: str = ctx.author.mention else: - recipient_normal: str = recipient + recipient_normal = recipient if discord.utils.raw_mentions(recipient): # There are mentions recipient_id: int = discord.utils.raw_mentions(recipient)[0] # First mention - recipient: str = self.bot.get_guild(ctx.guild.id)\ - .get_member(recipient_id).display_name - recipient_normal: str = self.bot.get_guild(ctx.guild.id)\ - .get_member(recipient_id).mention + guild: Optional[discord.Guild] = self.bot.get_guild(ctx.guild.id) + if not guild: + return + recipient_member: Optional[discord.Member] = guild.get_member(recipient_id) + if not recipient_member: + return + recipient = recipient_member.display_name + recipient_normal = recipient_member.mention else: - recipient: str = discord.utils.escape_mentions(recipient.strip()) + recipient = discord.utils.escape_mentions(recipient.strip()) try: response = await ctx.respond(f"*sends **{recipient_normal}** to the Gallows to be hanged asynchronely*") await self.util.increment_counter("hangings") @@ -1043,8 +1148,7 @@ class Misc(commands.Cog): await ctx.respond(f"Failed: {str(e)}") traceback.print_exc() return - - + @bridge.bridge_command() async def touch(self, ctx, *, recipient: Optional[str] = None) -> None: @@ -1056,9 +1160,12 @@ class Misc(commands.Cog): Returns: None """ + guild: Optional[discord.Guild] = self.bot.get_guild(ctx.guild.id) + if not guild: + return no_self_touch: str = ", don't fucking touch yourself here. You disgust me." - if recipient is None: + if not recipient: recipient_normal: str = ctx.author.mention await ctx.respond(f"{recipient_normal}{no_self_touch}") try: @@ -1066,18 +1173,21 @@ class Misc(commands.Cog): except Exception as e: logging.debug("Failed to add puke reactin for touch command: %s", str(e)) - return await self.util.increment_counter("touch_denials") + await self.util.increment_counter("touch_denials") else: - recipient_normal: str = recipient + recipient_normal = recipient if discord.utils.raw_mentions(recipient): # There are mentions recipient_id: int = discord.utils.raw_mentions(recipient)[0] # First mention - recipient: str = self.bot.get_guild(ctx.guild.id)\ - .get_member(recipient_id).display_name - recipient_normal: str = self.bot.get_guild(ctx.guild.id)\ - .get_member(recipient_id).mention + if not guild: + return + recipient_member: Optional[discord.Member] = guild.get_member(recipient_id) + if not recipient_member: + return + recipient = recipient_member.display_name + recipient_normal = recipient_member.mention else: - recipient: str = discord.utils.escape_mentions(recipient.strip()) + recipient = discord.utils.escape_mentions(recipient.strip()) try: response = await ctx.respond(f"*touches **{recipient_normal}** for **{ctx.author.mention}** because they wouldn't touch them with a shitty stick!*") await self.util.increment_counter("touches") @@ -1090,8 +1200,8 @@ class Misc(commands.Cog): traceback.print_exc() return await ctx.respond(f"Failed: {str(e)}") - @bridge.bridge_command() - @is_spamchan_or_drugs() # pylint: disable=too-many-function-args + @bridge.bridge_command() # type: ignore + @is_spamchan_or_drugs() async def qajoke(self, ctx) -> None: """ Get a joke in Q/A Form! @@ -1101,7 +1211,10 @@ class Misc(commands.Cog): None """ try: - (question, answer) = await self.util.get_qajoke() + qajoke: Optional[tuple] = await self.util.get_qajoke() + if not qajoke: + return + (question, answer) = qajoke escaped_question = discord.utils.escape_markdown(question) escasped_answer = discord.utils.escape_markdown(answer) embed: discord.Embed = discord.Embed(title=escaped_question, @@ -1110,8 +1223,8 @@ class Misc(commands.Cog): except Exception as e: await ctx.respond(f"Error: {str(e)}") - @bridge.bridge_command() - @is_spamchan_or_drugs() # pylint: disable=too-many-function-args + @bridge.bridge_command() # type: ignore + @is_spamchan_or_drugs() async def rjoke(self, ctx) -> None: """ Get a joke! (from r/jokes scrape) @@ -1121,7 +1234,10 @@ class Misc(commands.Cog): None """ try: - (title, body, score) = await self.util.get_rjoke() + rjoke: Optional[tuple] = await self.util.get_rjoke() + if not rjoke: + raise MiscException("Failed to get rjoke from db.") + (title, body, score) = rjoke escaped_title = discord.utils.escape_markdown(title) escaped_body = discord.utils.escape_markdown(body) embed: discord.Embed = discord.Embed(title=escaped_title, @@ -1132,8 +1248,8 @@ class Misc(commands.Cog): traceback.print_exc() return await ctx.respond(f"Error: {str(e)}") - @bridge.bridge_command() - @is_spamchan_or_drugs() # pylint: disable=too-many-function-args + @bridge.bridge_command() # type: ignore + @is_spamchan_or_drugs() async def joint(self, ctx, *, recipient: Optional[str] = None) -> None: """ @@ -1146,23 +1262,31 @@ class Misc(commands.Cog): """ authorDisplay: str = ctx.author.display_name if not(ctx.author.display_name is None)\ else ctx.message.author.display_name + - if recipient is None: - recipient: str = authorDisplay.strip() + if not recipient: + recipient = authorDisplay.strip() recipient_normal: str = ctx.user.mention else: - recipient_normal: str = recipient + recipient_normal = recipient if discord.utils.raw_mentions(recipient): # There are mentions recipient_id: int = discord.utils.raw_mentions(recipient)[0] # First mention - recipient: str = self.bot.get_guild(ctx.guild.id)\ - .get_member(recipient_id).display_name - recipient_normal: str = self.bot.get_guild(ctx.guild.id)\ - .get_member(recipient_id).mention + guild: Optional[discord.Guild] = self.bot.get_guild(ctx.guild.id) + if not guild: + return + recipient_member: Optional[discord.Member] = guild.get_member(recipient_id) + if not recipient_member: + return + recipient = recipient_member.display_name + recipient_normal = recipient_member.mention else: - recipient: str = discord.utils.escape_mentions(recipient.strip()) + recipient = discord.utils.escape_mentions(recipient.strip()) try: - (choice_strain, choice_desc) = await self.util.get_strain() + strain: Optional[tuple] = await self.util.get_strain() + if not strain: + raise MiscException("Failed to get strain from db.") + (choice_strain, choice_desc) = strain escaped_description = discord.utils.escape_markdown(choice_desc.strip()) await ctx.send_followup(f"*hands **{recipient_normal}** a joint rolled up with some **{choice_strain}***", username="Joint Granter") embed: discord.Embed = discord.Embed(title=choice_strain, @@ -1239,8 +1363,8 @@ class Misc(commands.Cog): """ User Commands """ - @commands.user_command(name="Give Joint") - @is_spamchan() # pylint: disable=too-many-function-args + @commands.user_command(name="Give Joint") # type: ignore + @is_spamchan() async def joint_context_menu(self, ctx, member: discord.Member) -> None: """ Joint Context Menu @@ -1250,7 +1374,10 @@ class Misc(commands.Cog): Returns: None """ - (choice_strain, choice_desc) = await self.util.get_strain() + strain: Optional[tuple] = await self.util.get_strain() + if not strain: + raise MiscException("Failed to get strain from db.") + (choice_strain, choice_desc) = strain escaped_desc = discord.utils.escape_markdown(choice_desc.strip()) await ctx.interaction.respond(f"*hands **<@{member.id}>** a joint rolled up with some **{choice_strain}***") embed: discord.Embed = discord.Embed(title=choice_strain, @@ -1278,10 +1405,7 @@ class Misc(commands.Cog): def cog_unload(self) -> None: """Run on Cog Unload""" - try: - self.randstat_loop.cancel() - except Exception as e: - logging.debug("Failed to cancel randstat loop: %s", str(e)) + pass def setup(bot) -> None: """Run on Cog Load""" diff --git a/cogs/misc_util.py b/cogs/misc_util.py index f3346c4..0b824b9 100644 --- a/cogs/misc_util.py +++ b/cogs/misc_util.py @@ -6,7 +6,7 @@ import traceback import random import datetime import pytz -from typing import Optional, LiteralString +from typing import Any, Optional, LiteralString, Union import regex import aiosqlite as sqlite3 from aiohttp import ClientSession, ClientTimeout @@ -19,7 +19,7 @@ class Util: self.URL_URBANDICTIONARY: str = "http://api.urbandictionary.com/v0/define" self.URL_INSULTAPI: str = "https://insult.mattbas.org/api/insult" self.COMPLIMENT_GENERATOR = ComplimentGenerator() - self.dbs: dict[str|LiteralString] = { + self.dbs: dict[str, str|LiteralString] = { 'whisky': os.path.join("/usr/local/share", "sqlite_dbs", "whiskey.db"), 'drinks': os.path.join("/usr/local/share", @@ -73,50 +73,42 @@ class Util: (mics, mils) = _t(td.microseconds, 1000) return (td.days, h, m, s, mics, mils) - def sqlite_dict_factory(self, cursor: sqlite3.Cursor, row: sqlite3.Row) -> dict: - """ - SQLite Dict Factory for Rows Returned - Args: - cursor (sqlite3.Row) - row (sqlite3.Row) - Returns: - dict - """ - fields = [column[0] for column in cursor.description] - return { key: value for key, value in zip(fields, row) } - - - async def get_counter(self, counter: Optional[str] = None) -> dict: + async def get_counter(self, counter: Optional[str] = None) -> Optional[dict]: """ Get Counter Args: counter (Optional[str]) Returns: - dict + Optional[dict] """ - async with sqlite3.connect(self.dbs.get('stats'), + stats_db: str|LiteralString = self.dbs.get('stats', '') + if not stats_db: + return None + async with sqlite3.connect(stats_db, timeout=3) as db_conn: - db_conn.row_factory = self.sqlite_dict_factory + db_conn.row_factory = sqlite3.Row query: str = "SELECT ? FROM stats LIMIT 1" if not counter: - query: str = "SELECT * FROM stats LIMIT 1" + query = "SELECT * FROM stats LIMIT 1" async with await db_conn.execute(query, (counter,) if counter else None) as db_cursor: - result: dict = await db_cursor.fetchone() + result = await db_cursor.fetchone() return result - async def get_stats_embed(self) -> Embed: + async def get_stats_embed(self) -> Optional[Embed]: """ Get Stats Embed Returns: - Embed + Optional[Embed] """ - counters: dict = await self.get_counter() + counters: Optional[dict] = await self.get_counter() + if not counters: + return None embed: Embed = Embed(title="Stats") counter_message: str = "" counters_sorted: dict = dict(sorted(counters.items(), key=lambda item: item[1], reverse=True)) for counter, value in counters_sorted.items(): - counter: str = regex.sub(r'_', ' ', + counter = regex.sub(r'_', ' ', counter.strip()).title() counter_message += f"- {value} {counter}\n" embed.description = counter_message.strip() @@ -130,7 +122,10 @@ class Util: Returns: bool """ - async with sqlite3.connect(self.dbs.get('stats'), + stats_db: str|LiteralString = self.dbs.get('stats', '') + if not stats_db: + return False + async with sqlite3.connect(stats_db, timeout=3) as db_conn: async with await db_conn.execute(f"UPDATE stats SET {counter} = {counter} + 1") as db_cursor: if db_cursor.rowcount < 0: @@ -139,11 +134,11 @@ class Util: await db_conn.commit() return True - async def get_ud_def(self, term: Optional[str] = None) -> tuple[str, str]: + async def get_ud_def(self, term: str) -> tuple[str, str]: """ Get Definition from UD Args: - term (Optional[str]) + term (str) Returns: tuple[str, str] """ @@ -202,93 +197,107 @@ class Util: return self.COMPLIMENT_GENERATOR.compliment(subject) return self.COMPLIMENT_GENERATOR.compliment_in_language(subject, language) - async def get_whisky(self) -> tuple: + async def get_whisky(self) -> Optional[tuple]: """ Get Whisky Returns: - tuple + Optional[tuple] """ - whisky_db: str|LiteralString = self.dbs.get('whisky') - db_conn = await sqlite3.connect(database=whisky_db, timeout=2) - db_query: str = "SELECT name, category, description FROM whiskeys ORDER BY random() LIMIT 1" - db_cursor: sqlite3.Cursor = await db_conn.execute(db_query) - db_result: tuple = await db_cursor.fetchone() - - (name, category, description) = db_result - name: str = regex.sub(r'(^\p{White_Space}|\r|\n)', '', - regex.sub(r'\p{White_Space}{2,}', ' ', - name.strip())) - category: str = regex.sub(r'(^\p{White_Space}|\r|\n)', '', - regex.sub(r'\p{White_Space}{2,}', ' ', - category.strip())) - description: str = regex.sub(r'(^\p{White_Space}|\r|\n)', '', + whisky_db: str|LiteralString = self.dbs.get('whisky', '') + if not whisky_db: + return None + async with sqlite3.connect(database=whisky_db, + timeout=2) as db_conn: + db_query: str = "SELECT name, category, description FROM whiskeys ORDER BY random() LIMIT 1" + async with await db_conn.execute(db_query) as db_cursor: + db_result: Optional[Union[sqlite3.Row, tuple]] = await db_cursor.fetchone() + if not db_result: + return None + (name, category, description) = db_result + name = regex.sub(r'(^\p{White_Space}|\r|\n)', '', regex.sub(r'\p{White_Space}{2,}', ' ', - description.strip())) - return (name, category, description) + name.strip())) + category = regex.sub(r'(^\p{White_Space}|\r|\n)', '', + regex.sub(r'\p{White_Space}{2,}', ' ', + category.strip())) + description = regex.sub(r'(^\p{White_Space}|\r|\n)', '', + regex.sub(r'\p{White_Space}{2,}', ' ', + description.strip())) + return (name, category, description) - async def get_drink(self) -> tuple: + async def get_drink(self) -> Optional[tuple]: """ Get Drink Returns: - tuple + Optional[tuple] """ - drinks_db: str|LiteralString = self.dbs.get('drinks') - db_conn = await sqlite3.connect(database=drinks_db, timeout=2) - db_query: str = "SELECT name, ingredients FROM cocktails ORDER BY random() LIMIT 1" - db_cursor: sqlite3.Cursor = await db_conn.execute(db_query) - db_result: tuple = await db_cursor.fetchone() + drinks_db: str|LiteralString = self.dbs.get('drinks', '') + if not drinks_db: + return None + async with sqlite3.connect(database=drinks_db, + timeout=2) as db_conn: + db_query: str = "SELECT name, ingredients FROM cocktails ORDER BY random() LIMIT 1" + async with await db_conn.execute(db_query) as db_cursor: + db_result: tuple = await db_cursor.fetchone() + + (name, ingredients) = db_result + name = regex.sub(r'(^\p{White_Space}|\r|\n)', '', regex.sub(r'\p{White_Space}{2,}', ' ', name.strip())) + ingredients = regex.sub(r'(^\p{White_Space}|\r|\n)', '', regex.sub(r'\p{White_Space}{2,}', ' ', ingredients.strip())) + ingredients = regex.sub(r'\*', '\u2731', ingredients.strip()) + return (name, ingredients) - (name, ingredients) = db_result - name = regex.sub(r'(^\p{White_Space}|\r|\n)', '', regex.sub(r'\p{White_Space}{2,}', ' ', name.strip())) - ingredients = regex.sub(r'(^\p{White_Space}|\r|\n)', '', regex.sub(r'\p{White_Space}{2,}', ' ', ingredients.strip())) - ingredients = regex.sub(r'\*', '\u2731', ingredients.strip()) - - return (name, ingredients) - - async def get_strain(self, strain: Optional[str] = None) -> tuple: + async def get_strain(self, strain: Optional[str] = None) -> Optional[tuple]: """ Get Strain Args: strain (Optional[str]) Returns: - tuple + Optional[tuple] """ - strains_db: str|LiteralString = self.dbs.get('strains') - db_conn = await sqlite3.connect(database=strains_db, timeout=2) - db_params: Optional[tuple] = None - if not strain: - db_query: str = "SELECT name, description FROM strains_w_desc ORDER BY random() LIMIT 1" - else: - db_query: str = "SELECT name, description FROM strains_w_desc WHERE name LIKE ?" - db_params: tuple = (f"%{strain.strip()}%",) - - db_cursor: sqlite3.Cursor = await db_conn.execute(db_query, db_params) - db_result: tuple = await db_cursor.fetchone() + strains_db: str|LiteralString = self.dbs.get('strains', '') + if not strains_db: + return None + async with sqlite3.connect(database=strains_db, + timeout=2) as db_conn: + db_params: Optional[tuple] = None + if not strain: + db_query: str = "SELECT name, description FROM strains_w_desc ORDER BY random() LIMIT 1" + else: + db_query = "SELECT name, description FROM strains_w_desc WHERE name LIKE ?" + db_params = (f"%{strain.strip()}%",) + async with await db_conn.execute(db_query, db_params) as db_cursor: + db_result: Optional[tuple] = await db_cursor.fetchone() + return db_result - return db_result - - async def get_qajoke(self) -> tuple: + async def get_qajoke(self) -> Optional[tuple]: """ Get QA Joke Returns: - tuple + Optional[tuple] """ - qajoke_db: str|LiteralString = self.dbs.get('qajoke') - async with sqlite3.connect(database=qajoke_db, timeout=2) as db: - async with await db.execute('SELECT question, answer FROM jokes ORDER BY RANDOM() LIMIT 1') as cursor: + qajoke_db: str|LiteralString = self.dbs.get('qajoke', '') + if not qajoke_db: + return None + async with sqlite3.connect(database=qajoke_db, + timeout=2) as db_conn: + db_query: str = "SELECT question, answer FROM jokes ORDER BY RANDOM() LIMIT 1" + async with await db_conn.execute(db_query) as cursor: (question, answer) = await cursor.fetchone() return (question, answer) return None - async def get_rjoke(self) -> tuple: + async def get_rjoke(self) -> Optional[tuple]: """ Get r/joke Joke Returns: - tuple + Optional[tuple] """ - rjokes_db: str|LiteralString = self.dbs.get('rjokes') - async with sqlite3.connect(database=rjokes_db, timeout=2) as db: - async with await db.execute('SELECT title, body, score FROM jokes WHERE score >= 100 ORDER BY RANDOM() LIMIT 1') as cursor: + rjokes_db: str|LiteralString = self.dbs.get('rjokes', '') + if not rjokes_db: + return None + async with sqlite3.connect(database=rjokes_db, timeout=2) as db_conn: + db_query: str = "SELECT title, body, score FROM jokes WHERE score >= 100 ORDER BY RANDOM() LIMIT 1'" + async with await db_conn.execute(db_query) as cursor: (title, body, score) = await cursor.fetchone() return (title, body, score) return None @@ -307,29 +316,34 @@ class Util: try: async with await client.get(facts_api_url, timeout=ClientTimeout(connect=5, sock_read=5)) as request: - json: dict = await request.json() - fact: str = json.get('text') + _json: dict = await request.json() + fact: str = _json.get('text', None) if not fact: raise BaseException("RandFact Src 1 Failed") return fact except: async with await client.get(facts_backup_url, timeout=ClientTimeout(connect=5, sock_read=5)) as request: - json: dict = await request.json() - fact: str = json.get('fact') + _json = await request.json() + fact = _json.get('fact', None) return fact except Exception as e: traceback.print_exc() return f"Failed to get a random fact :( [{str(e)}]" - async def get_cookie(self) -> dict: + async def get_cookie(self) -> Optional[dict]: """ Get Cookie Returns: - dict + Optional[dict] """ - async with sqlite3.connect(self.dbs.get('cookies'), timeout=2) as db_conn: - async with await db_conn.execute("SELECT name, origin, image_url FROM cookies ORDER BY RANDOM() LIMIT 1") as db_cursor: + cookies_db = self.dbs.get('cookies', '') + if not cookies_db: + return None + async with sqlite3.connect(cookies_db, + timeout=2) as db_conn: + db_query: str = "SELECT name, origin, image_url FROM cookies ORDER BY RANDOM() LIMIT 1" + async with await db_conn.execute(db_query) as db_cursor: (name, origin, image_url) = await db_cursor.fetchone() return { 'name': name, @@ -338,7 +352,7 @@ class Util: } - def get_coffee(self) -> str: + def get_coffee(self) -> Optional[str]: """ Get Coffee Returns: @@ -354,16 +368,16 @@ class Util: return randomCoffee except: traceback.print_exc() - return False + return None - def get_days_to_xmas(self) -> tuple[int|float]: + def get_days_to_xmas(self) -> Optional[tuple]: """ Get # of Days until Xmas Returns: - tuple[int|float] + Optional[tuple] """ - today: datetime = datetime.datetime.now(tz=pytz.UTC) - xmas: datetime = datetime.datetime( + today: datetime.datetime = datetime.datetime.now(tz=pytz.UTC) + xmas: datetime.datetime = datetime.datetime( year=today.year, month=12, day=25, @@ -374,15 +388,18 @@ class Util: return (days, hours, minutes, seconds, ms, us) - async def get_randmsg(self) -> str: + async def get_randmsg(self) -> Optional[str]: """ Get Random Message from randmsg.db Returns: - str + Optional[str] """ - randmsg_db = self.dbs.get('randmsg') + randmsg_db: str|LiteralString = self.dbs.get('randmsg', '') + if not randmsg_db: + return None async with sqlite3.connect(database=randmsg_db, timeout=2) as db_conn: - async with await db_conn.execute("SELECT msg FROM msgs ORDER BY RANDOM() LIMIT 1") as db_cursor: + db_query: str = "SELECT msg FROM msgs ORDER BY RANDOM() LIMIT 1" + async with await db_conn.execute(db_query) as db_cursor: (result,) = await db_cursor.fetchone() return result \ No newline at end of file diff --git a/cogs/quote.py b/cogs/quote.py deleted file mode 100644 index dd3b938..0000000 --- a/cogs/quote.py +++ /dev/null @@ -1,321 +0,0 @@ -#!/usr/bin/env python3.12 -# pylint: disable=bare-except, broad-exception-caught - -""" -Quote cog for Havoc -""" - -import traceback -import time -import os -import datetime -import asyncio -import discord -import aiosqlite as sqlite3 -from discord.ext import bridge, commands -from disc_havoc import Havoc - -class DB: - """DB Utility for Quote Cog""" - def __init__(self, bot: Havoc): - self.bot: Havoc = bot - self.db_path = os.path.join("/", "usr", "local", "share", - "sqlite_dbs", "quotes.db") - self.hp_chanid = 1157529874936909934 - - - async def get_quote_count(self): - """Get Quote Count""" - async with sqlite3.connect(self.db_path, timeout=2) as db_conn: - async with await db_conn.execute("SELECT COUNT (*) FROM quotes") as db_cursor: - result = await db_cursor.fetchone() - return result[-1] - - async def remove_quote(self, quote_id: int): - """Remove Quote from DB""" - try: - async with sqlite3.connect(self.db_path, timeout=2) as db_conn: - async with await db_conn.execute("DELETE FROM quotes WHERE id = ?", (quote_id,)) as _: - await db_conn.commit() - return True - except: - await self.bot.get_channel(self.hp_chanid).send(traceback.format_exc()) - return False - - async def add_quote(self, message_id: int, channel_id: int, - quoted_member_id: int, - message_time: int, - quoter_friendly: str, - quoted_friendly: str, - channel_friendly: str, - message_content: str, - ): - """Add Quote to DB""" - params = ( - quoter_friendly, - int(time.time()), - quoted_friendly, - quoted_member_id, - channel_friendly, - channel_id, - message_id, - message_time, - quoter_friendly, - message_content, - ) - - try: - async with sqlite3.connect(self.db_path, timeout=2) as db_conn: - # pylint: disable=line-too-long - db_conn.row_factory = lambda c, r: dict([(col[0], r[idx]) for idx, col in enumerate(c.description)]) - async with await db_conn.execute("INSERT INTO quotes (added_by, added_at, quoted_user_display, quoted_user_memberid, quoted_channel_display, quoted_channel_id, quoted_message_id, quoted_message_time, added_by_friendly, quoted_message) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", - params) as _: # pylint: enable=line-too-long - await db_conn.commit() - return True - except: - return traceback.format_exc() - - async def fetch_quote(self, random: bool = False, quoteid: int = None, added_by: str = None, quoted_user: str = None, content: str = None): - """Fetch Quote from DB""" - try: - query_head = "SELECT id, added_by_friendly, added_at, quoted_user_display, quoted_channel_display, quoted_message_time, quoted_message FROM quotes" - query = "" - params = None - - if random: - query = f"{query_head} ORDER BY RANDOM() LIMIT 1" - elif quoteid: - query = f"{query_head} WHERE id = ? LIMIT 1" - params = (quoteid,) - elif added_by: - query = f"{query_head} WHERE added_by_friendly LIKE ? ORDER BY RANDOM() LIMIT 5" - params = (f"%{added_by}%",) - elif quoted_user: - query = f"{query_head} WHERE quoted_user_display LIKE ? ORDER BY RANDOM() LIMIT 5" - params = (f"%{quoted_user}%",) - elif content: - query = f"{query_head} WHERE quoted_message LIKE ? ORDER BY RANDOM() LIMIT 5" - params = (f"%{content}%",) - - async with sqlite3.connect(self.db_path, timeout=2) as db_conn: - db_conn.row_factory = lambda c, r: dict([(col[0], r[idx]) for idx, col in enumerate(c.description)]) - async with await db_conn.execute(query, params) as db_cursor: - results = await db_cursor.fetchall() - if not results: - return { - 'err': 'No results for query', - } - if random or quoteid: - chosen = results[-1] - return { - str(k): v for k,v in chosen.items() - } - else: - return [ - { str(k): v for k,v in _.items() } - for _ in results - ] - except: - return traceback.format_exc() - - -class Quote(commands.Cog): - """Quote Cog for Havoc""" - def __init__(self, bot: Havoc): - self.bot: Havoc = bot - self.db = DB(self.bot) - - def is_homeserver(): # pylint: disable=no-method-argument - """Check if channel/interaction is within homeserver""" - def predicate(ctx): - try: - return ctx.guild.id == 1145182936002482196 - except: - traceback.print_exc() - return False - return commands.check(predicate) - - @commands.message_command(name="Add Quote") - async def add_quote(self, ctx, message: discord.Message): - """Add A Quote""" - hp_chanid = 1157529874936909934 - try: - if message.author.bot: - return await ctx.respond("Quotes are for real users, not bots.", ephemeral=True) - quoter_friendly = ctx.author.display_name - quoted_message_id = message.id - quoted_channel_friendly = f'#{ctx.channel.name}' - quoted_channel_id = ctx.channel.id - message_content = message.content - message_author_friendly = message.author.display_name - message_author_id = message.author.id - message_time = int(message.created_at.timestamp()) - message_escaped = discord.utils.escape_mentions(discord.utils.escape_markdown(message_content)).strip() - - if len(message_escaped) < 3: - return await ctx.respond("**Error**: Message (text content) is not long enough to quote.", ephemeral=True) - - if len(message_escaped) > 512: - return await ctx.respond("**Error**: Message (text content) is too long to quote.", ephemeral=True) - - result = await self.db.add_quote(message_id=quoted_message_id, - channel_id=quoted_channel_id, - quoted_member_id=message_author_id, - message_time=message_time, - quoter_friendly=quoter_friendly, - quoted_friendly=message_author_friendly, - channel_friendly=quoted_channel_friendly, - message_content=message_content) - if not result: - return await ctx.respond("Failed!", ephemeral=True) - else: - return await ctx.respond("OK!", ephemeral=True) - except: - await self.bot.get_channel(hp_chanid).send(traceback.format_exc()) - - - @bridge.bridge_command(aliases=['rand']) - @is_homeserver() # pylint: disable=too-many-function-args - async def randquote(self, ctx): - """Get a random quote""" - try: - random_quote = await self.db.fetch_quote(random=True) - if random_quote.get('err'): - return await ctx.respond("Failed to get a quote") - - quote_id = random_quote.get('id') - quoted_friendly = random_quote.get('quoted_user_display', 'Unknown') - adder_friendly = random_quote.get('added_by_friendly', 'Unknown') - message_time = datetime.datetime.fromtimestamp(random_quote.get('quoted_message_time')) - message_channel = random_quote.get('quoted_channel_display') - quote_added_at = datetime.datetime.fromtimestamp(random_quote.get('added_at')) - quote_content = random_quote.get('quoted_message') - - embed = discord.Embed( - colour=discord.Colour.orange(), - title=f"Quote #{quote_id}", - ) - embed.description = f"**{quoted_friendly}:** {quote_content}" - embed.add_field(name="Original Message Time", value=message_time) - embed.add_field(name="Channel", value=message_channel) - embed.add_field(name="Quote ID", value=quote_id) - embed.footer = discord.EmbedFooter(text=f"Added by {adder_friendly} {quote_added_at}") - - return await ctx.respond(embed=embed) - except: - error = await ctx.respond(traceback.format_exc()) - await asyncio.sleep(10) - await error.delete() - - @bridge.bridge_command(aliases=['qg']) - @is_homeserver() # pylint: disable=too-many-function-args - async def quoteget(self, ctx, quoteid): - """Get a specific quote by ID""" - try: - if not str(quoteid).strip().isnumeric(): - return await ctx.respond("**Error**: Quote ID must be numeric.") - fetched_quote = await self.db.fetch_quote(quoteid=quoteid) - if fetched_quote.get('err'): - return await ctx.respond("**Error**: Quote not found") - - quote_id = fetched_quote.get('id') - quoted_friendly = fetched_quote.get('quoted_user_display', 'Unknown') - adder_friendly = fetched_quote.get('added_by_friendly', 'Unknown') - message_time = datetime.datetime.fromtimestamp(fetched_quote.get('quoted_message_time')) - message_channel = fetched_quote.get('quoted_channel_display') - quote_added_at = datetime.datetime.fromtimestamp(fetched_quote.get('added_at')) - quote_content = fetched_quote.get('quoted_message') - - embed = discord.Embed( - colour=discord.Colour.orange(), - title=f"Quote #{quote_id}", - ) - embed.description = f"**{quoted_friendly}:** {quote_content}" - embed.add_field(name="Original Message Time", value=message_time) - embed.add_field(name="Channel", value=message_channel) - embed.add_field(name="Quote ID", value=quote_id) - embed.footer = discord.EmbedFooter(text=f"Added by {adder_friendly} {quote_added_at}") - - return await ctx.respond(embed=embed) - except: - error = await ctx.respond(traceback.format_exc()) - await asyncio.sleep(10) - await error.delete() - - @bridge.bridge_command(aliases=['qs']) - @is_homeserver() # pylint: disable=too-many-function-args - async def quotesearch(self, ctx, *, content: str): - """Search for a quote (by content)""" - try: - found_quotes = await self.db.fetch_quote(content=content) - if isinstance(found_quotes, dict) and found_quotes.get('err'): - return await ctx.respond(f"Quote search failed: {found_quotes.get('err')}") - - embeds = [] - - for quote in found_quotes: - quote_id = quote.get('id') - quoted_friendly = quote.get('quoted_user_display', 'Unknown') - adder_friendly = quote.get('added_by_friendly', 'Unknown') - message_time = datetime.datetime.fromtimestamp(quote.get('quoted_message_time')) - message_channel = quote.get('quoted_channel_display') - quote_added_at = datetime.datetime.fromtimestamp(quote.get('added_at')) - quote_content = quote.get('quoted_message') - - # await ctx.respond(f"**{quoted_friendly}**: {quote}")ed_friendly = quote.get('quoted_user_display', 'Unknown') - adder_friendly = quote.get('added_by_friendly', 'Unknown') - message_time = datetime.datetime.fromtimestamp(quote.get('quoted_message_time')) - message_channel = quote.get('quoted_channel_display') - quote_added_at = datetime.datetime.fromtimestamp(quote.get('added_at')) - quote = quote.get('quoted_message') - - # await ctx.respond(f"**{quoted_friendly}**: {quote}") - embed = discord.Embed( - colour=discord.Colour.orange(), - title=f"Quote #{quote_id}", - ) - embed.description = f"**{quoted_friendly}:** {quote_content}" - embed.add_field(name="Original Message Time", value=message_time) - embed.add_field(name="Channel", value=message_channel) - embed.add_field(name="Quote ID", value=quote_id) - embed.footer = discord.EmbedFooter(text=f"Added by {adder_friendly} {quote_added_at}") - embeds.append(embed) - - return await ctx.respond(embeds=embeds) - except Exception as e: - await ctx.respond(f"Error: {type(e).__name__} - {str(e)}") - - @bridge.bridge_command(aliases=['nq']) - @is_homeserver() # pylint: disable=too-many-function-args - async def nquotes(self, ctx): - """Get # of quotes stored""" - try: - quote_count = await self.db.get_quote_count() - if not quote_count: - return await ctx.respond("**Error**: No quotes found!") - return await ctx.respond(f"I currently have **{quote_count}** quotes stored.") - - except Exception as e: - await ctx.respond(f"Error: {type(e).__name__} - {str(e)}") - - @bridge.bridge_command(aliases=['qr']) - @commands.is_owner() - @is_homeserver() # pylint: disable=too-many-function-args - async def quoteremove(self, ctx, quoteid): - """Remove a quote (by id) - Owner only""" - try: - if not str(quoteid).strip().isnumeric(): - return await ctx.respond("**Error**: Quote ID must be numeric.") - quoteid = int(quoteid) - remove_quote = await self.db.remove_quote(quoteid) - if not remove_quote: - return await ctx.respond("**Error**: Failed!", ephemeral=True) - return await ctx.respond("Removed!", ephemeral=True) - - except Exception as e: - await ctx.respond(f"Error: {type(e).__name__} - {str(e)}") - -def setup(bot): - """Run on Cog Load""" - bot.add_cog(Quote(bot)) diff --git a/cogs/radio.py b/cogs/radio.py index c8c6b0c..ed600c8 100644 --- a/cogs/radio.py +++ b/cogs/radio.py @@ -29,7 +29,7 @@ class Radio(commands.Cog): """Run on Bot Ready""" await self.radio_init() - def is_radio_chan(): # pylint: disable=no-method-argument + def is_radio_chan(): # type: ignore """Check if channel is radio chan""" def predicate(ctx): try: @@ -59,8 +59,12 @@ class Radio(commands.Cog): """ try: (radio_guild, radio_chan) = self.channels['sfm'] - channel: discord.TextChannel = self.bot.get_guild(radio_guild)\ - .get_channel(radio_chan) + guild: Optional[discord.Guild] = self.bot.get_guild(radio_guild) + if not guild: + return + channel = guild.get_channel(radio_chan) + if not isinstance(channel, discord.VoiceChannel): + return if not self.bot.voice_clients: await channel.connect() try: @@ -84,24 +88,35 @@ class Radio(commands.Cog): try: (radio_guild, radio_chan) = self.channels['sfm'] try: - vc: discord.VoiceClient = self.bot.voice_clients[-1] + vc: discord.VoiceProtocol = self.bot.voice_clients[-1] except: logging.debug("No voice client, establishing new VC connection...") - channel = self.bot.get_guild(radio_guild)\ - .get_channel(radio_chan) + guild: Optional[discord.Guild] = self.bot.get_guild(radio_guild) + if not guild: + return + channel = guild.get_channel(radio_chan) + if not isinstance(channel, discord.VoiceChannel): + return await channel.connect() vc = self.bot.voice_clients[-1] - if not(vc.is_playing()) or vc.is_paused(): + + if not vc.is_playing() or vc.is_paused(): # type: ignore + """ + Mypy does not seem aware of the is_playing, play, and is_paused methods, + but they exist. + """ logging.info("Detected VC not playing... playing!") source = discord.FFmpegOpusAudio(self.STREAM_URL, before_options="-timeout 3000000") - vc.play(source, after=lambda e: logging.info("Error: %s", e)\ - if e else None) + + vc.play(source, after=lambda e: logging.info("Error: %s", e) if e else None) # type: ignore + # Get Now Playing np_track = await get_now_playing() if np_track and not self.LAST_NP_TRACK == np_track: - self.LAST_NP_TRACK: str = np_track - await vc.channel.set_status(f"Now playing: {np_track}") + self.LAST_NP_TRACK = np_track + if isinstance(vc.channel, discord.VoiceChannel): + await vc.channel.set_status(f"Now playing: {np_track}") except: traceback.print_exc() diff --git a/constructors.py b/constructors.py index a935ce0..175493a 100644 --- a/constructors.py +++ b/constructors.py @@ -13,3 +13,12 @@ LoveHate class LoveHateException(Exception): """Love Hate Exception (generic)""" pass + + +""" +Misc +""" + +class MiscException(Exception): + """Misc Exception (generic)""" + pass \ No newline at end of file diff --git a/disc_havoc.py b/disc_havoc.py index f6bed00..76e0f82 100644 --- a/disc_havoc.py +++ b/disc_havoc.py @@ -29,7 +29,6 @@ cogs_list: list[str] = [ 'meme', 'karma', 'lovehate', - 'quote', 'radio', ] @@ -46,6 +45,8 @@ class Havoc(bridge.Bot): owner_ids=OWNERS, activity=bot_activity, help_command=commands.MinimalHelpCommand()) self.BOT_CHANIDS = BOT_CHANIDS + self.load_exts() + def load_exts(self, initialRun: Optional[bool] = True) -> None: """ diff --git a/util/lovehate_db.py b/util/lovehate_db.py index 04726b0..4458f72 100644 --- a/util/lovehate_db.py +++ b/util/lovehate_db.py @@ -31,13 +31,13 @@ class DB: if hates and loves: raise LoveHateException("Both hates and loves may not be True") elif hates: - flag: int = -1 + flag = -1 elif loves: - flag: int = 1 + flag = 1 elif not hates and not loves: raise LoveHateException("Neither loves nor hates were requested") - params: tuple = (thing, flag,) + params = (thing, flag,) async with sqlite3.connect(self.db_path, timeout=2) as db_conn: async with await db_conn.execute(query, params) as db_cursor: result: list[tuple] = await db_cursor.fetchall() @@ -47,7 +47,7 @@ class DB: return result async def get_lovehates(self, loves: bool = False, hates: bool = False, - user: str = None, thing: str = None) -> list[tuple]|bool: + user: Optional[str] = None, thing: Optional[str] = None) -> list[tuple]|bool: """ Get a list of either 1) what {user} loves/hates, or who loves/hates {thing}, depending on bools loves, hates Args: @@ -70,18 +70,18 @@ class DB: if hates and loves: raise LoveHateException("Both hates and loves may not be True") elif hates: - flag: int = -1 + flag = -1 elif loves: - flag: int = 1 + flag = 1 elif not hates and not loves: raise LoveHateException("Neither loves nor hates were requested") if user: - query: str = "SELECT thing FROM lovehate WHERE display_name LIKE ? AND flag == ?" - params: tuple = (user, flag,) + query = "SELECT thing FROM lovehate WHERE display_name LIKE ? AND flag == ?" + params = (user, flag,) elif thing: - query: str = "SELECT display_name FROM lovehate WHERE thing LIKE ? AND flag == ?" - params: tuple = (thing, flag,) + query = "SELECT display_name FROM lovehate WHERE thing LIKE ? AND flag == ?" + params = (thing, flag,) async with sqlite3.connect(self.db_path, timeout=2) as db_conn: async with await db_conn.execute(query, params) as db_cursor: @@ -127,21 +127,21 @@ class DB: db_query: str = "" params: tuple = (user, thing,) - already_opinionated: bool = await self.check_existence(user, thing) + already_opinionated: Optional[int] = await self.check_existence(user, thing) if already_opinionated: if flag == 0: - db_query: str = "DELETE FROM lovehate WHERE display_name LIKE ? AND thing LIKE ?" + db_query = "DELETE FROM lovehate WHERE display_name LIKE ? AND thing LIKE ?" else: loves_or_hates: str = "loves" if already_opinionated == -1: - loves_or_hates: str = "hates" + loves_or_hates = "hates" raise LoveHateException(f"But {user} already {loves_or_hates} {thing}...") else: match flag: case -1: - db_query: str = "INSERT INTO lovehate(display_name, flag, thing) VALUES(?, -1, ?)" + db_query = "INSERT INTO lovehate(display_name, flag, thing) VALUES(?, -1, ?)" case 1: - db_query: str = "INSERT INTO lovehate(display_name, flag, thing) VALUES(?, 1, ?)" + db_query = "INSERT INTO lovehate(display_name, flag, thing) VALUES(?, 1, ?)" case _: raise LoveHateException("Unknown error, default case matched")