diff --git a/cogs/karma.py b/cogs/karma.py index 36acb92..f9a0d3a 100644 --- a/cogs/karma.py +++ b/cogs/karma.py @@ -14,14 +14,14 @@ import regex from regex import Pattern from aiohttp import ClientSession, ClientTimeout from discord.ext import bridge, commands, tasks - +from disc_havoc import Havoc class Util: """Karma Utility""" - def __init__(self, bot): - self.bot = bot + def __init__(self, bot: Havoc): + self.bot: Havoc = bot self.api_key: str = constants.PRV_API_KEY self.karma_endpoints_base_url: str = "https://api.codey.lol/karma/" self.karma_retrieval_url: str = f"{self.karma_endpoints_base_url}get" @@ -146,9 +146,9 @@ class Util: class Karma(commands.Cog): """Karma Cog for Havoc""" - def __init__(self, bot): + def __init__(self, bot: Havoc): importlib.reload(constants) - self.bot: discord.Bot = bot + self.bot: Havoc = bot self.util = Util(self.bot) # self.karma_regex = regex.compile(r'(\w+)(\+\+|\-\-)') self.karma_regex: Pattern = regex.compile(r'(\b\w+(?:\s+\w+)*)(\+\+($|\s)|\-\-($|\s))') diff --git a/cogs/lovehate.py b/cogs/lovehate.py index f7818fa..84105b1 100644 --- a/cogs/lovehate.py +++ b/cogs/lovehate.py @@ -10,12 +10,12 @@ import aiosqlite as sqlite3 from discord.ext import bridge, commands from util.lovehate_db import DB from constructors import LoveHateException - +from disc_havoc import Havoc class LoveHate(commands.Cog): """LoveHate Cog for Havoc""" - def __init__(self, bot): - self.bot: discord.Bot = bot + def __init__(self, bot: Havoc) -> None: + self.bot: Havoc = bot self.db = DB(self.bot) def join_with_and(self, items: list) -> str: diff --git a/cogs/meme.py b/cogs/meme.py index 6b7d3df..1cfee45 100644 --- a/cogs/meme.py +++ b/cogs/meme.py @@ -6,12 +6,13 @@ import json import io import asyncio import random -from typing import LiteralString, Optional +from typing import LiteralString, Optional, Any import logging import textwrap import regex import requests import discord +from disc_havoc import Havoc from aiohttp import ClientSession from discord.ext import bridge, commands, tasks from jesusmemes import JesusMemeGenerator @@ -56,6 +57,8 @@ class MemeView(discord.ui.View): async def select_callback(self, select: discord.ui.Select, interaction: discord.Interaction) -> None: """Meme Selection Callback""" + if not isinstance(select.values[0], str): + return modal: discord.ui.Modal = MemeModal(meme=select.values[0], title="Meme Selected") await interaction.response.send_modal(modal) @@ -63,7 +66,7 @@ class MemeModal(discord.ui.Modal): """Meme Creation discord.ui.Modal""" def __init__(self, *args, meme: Optional[str] = None, **kwargs) -> None: super().__init__(*args, **kwargs) - self.selected_meme: str = meme + self.selected_meme: Optional[str] = meme self.meme_generator = JesusMemeGenerator() self.TEXT_LIMIT: int = 80 @@ -73,7 +76,14 @@ class MemeModal(discord.ui.Modal): style=discord.InputTextStyle.singleline)) async def callback(self, interaction: discord.Interaction) -> None: + if not self.selected_meme: # No meme selected + return selected_meme: str = self.selected_meme + if not self.children or len(self.children) < 2: # Invalid request + return + if not isinstance(self.children[0].value, str)\ + or not isinstance(self.children[1].value, str): # Invalid request + return meme_top_line: str = self.children[0].value.strip() meme_bottom_line: str = self.children[1].value.strip() if len(meme_top_line) > self.TEXT_LIMIT or len(meme_bottom_line) > self.TEXT_LIMIT: @@ -85,18 +95,18 @@ class MemeModal(discord.ui.Modal): embed: discord.Embed = discord.Embed(title="Generated Meme") embed.set_image(url=meme_link) - embed.add_field(name="Meme", value=self.selected_meme, inline=True) + embed.add_field(name="Meme", value=selected_meme, inline=True) await interaction.response.send_message(embeds=[embed]) return class Meme(commands.Cog): """Meme Cog for Havoc""" - def __init__(self, bot) -> None: - self.bot: discord.Bot = bot + def __init__(self, bot: Havoc) -> None: + self.bot: Havoc = bot self.meme_choices: list = [] self.meme_counter: int = 0 - self.THREADS: dict[dict[list]] = { + self.THREADS: dict[str, dict[int, list]] = { # Format: Guild1: [ChanId : [Webhook, ThreadId], Guild2: [ChanId : [Webhook, ThreadId] 'comic_explosm': { 1298729744216359055: [constants.EXPLOSM_WEBHOOK, 1299165855493390367], @@ -120,7 +130,7 @@ class Meme(commands.Cog): } } - self.NO_THREAD_WEBHOOKS: dict[list] = { + self.NO_THREAD_WEBHOOKS: dict[str, list] = { 'theonion': [constants.ONION_WEBHOOK, constants.ONION_WEBHOOK2], 'thn': [constants.THN_WEBHOOK], 'memes': [constants.MEME_WEBHOOK1, constants.MEME_WEBHOOK2], @@ -132,7 +142,7 @@ class Meme(commands.Cog): self.meme_stream_loop.start() self.explosm_loop.start() - def is_spamchan() -> bool: # pylint: disable=no-method-argument + def is_spamchan() -> bool: # type: ignore """Check if channel is spamchan""" def predicate(ctx): try: @@ -142,7 +152,7 @@ class Meme(commands.Cog): except: traceback.print_exc() return False - return commands.check(predicate) + return commands.check(predicate) # type: ignore @@ -163,39 +173,43 @@ class Meme(commands.Cog): dino_grabber = dinog.DinosaurGrabber() onion_grabber = oniong.OnionGrabber() thn_grabber = thng.THNGrabber() - explosm_comics = xkcd_comics = smbc_comics = qc_comics = dino_comics\ - = onions = thns = [] - memes: list[tuple] = await meme_grabber.get() + explosm_comics: list[Optional[tuple]] = [] + xkcd_comics: list[Optional[tuple]] = [] + smbc_comics: list[Optional[tuple]] = [] + dino_comics: list[Optional[tuple]] = [] + onions: list[Optional[tuple]] = [] + thns: list[Optional[tuple]] = [] + memes: list[Optional[tuple]] = await meme_grabber.get() try: try: - explosm_comics: list[tuple] = await explosm_grabber.get() + explosm_comics = await explosm_grabber.get() except: pass try: - xkcd_comics: list[tuple] = await xkcd_grabber.get() + xkcd_comics = await xkcd_grabber.get() except: pass try: - smbc_comics: list[tuple] = await smbc_grabber.get() + smbc_comics = await smbc_grabber.get() except: pass try: - qc_comics: list[tuple] = await qc_grabber.get() + qc_comics = await qc_grabber.get() print(f"QC: {qc_comics}") except: pass try: - dino_comics: list[tuple] = await dino_grabber.get() + dino_comics = await dino_grabber.get() except Exception as e: logging.debug("Dino failed: %s", str(e)) pass try: - onions: list[tuple] = await onion_grabber.get() + onions = await onion_grabber.get() except Exception as e: logging.debug("Onion failed: %s", str(e)) pass try: - thns: list[tuple] = await thn_grabber.get() + thns = await thn_grabber.get() except Exception as e: logging.debug("THNs failed: %s", str(e)) pass @@ -208,12 +222,14 @@ class Meme(commands.Cog): if not only_comics: try: for meme in memes: + if not meme: + continue (meme_id, meme_title, meme_url) = meme # pylint: disable=unused-variable request = requests.get(meme_url, stream=True, timeout=(5, 30), headers=headers) if not request.status_code == 200: continue meme_content: bytes = request.raw.read() - for meme_hook in self.NO_THREAD_WEBHOOKS.get('memes'): + for meme_hook in self.NO_THREAD_WEBHOOKS.get('memes', {}): meme_image: io.BytesIO = io.BytesIO(meme_content) ext: str = meme_url.split(".")[-1]\ .split("?")[0].split("&")[0] @@ -227,24 +243,27 @@ class Meme(commands.Cog): pass try: for comic in explosm_comics: + if not comic: + continue (comic_title, comic_url) = comic - comic_title: str = discord.utils.escape_markdown(comic_title) + comic_title = discord.utils.escape_markdown(comic_title) comic_request = requests.get(comic_url, stream=True, timeout=(5, 20), headers=headers) comic_request.raise_for_status() comic_content: bytes = comic_request.raw.read() - ext: str = comic_url.split(".")[-1]\ + ext = comic_url.split(".")[-1]\ .split("?")[0].split("&")[0] async with ClientSession() as session: - for chanid, _hook in self.THREADS.get('comic_explosm').items(): + for chanid, _hook in self.THREADS.get('comic_explosm', {}).items(): comic_image: io.BytesIO = io.BytesIO(comic_content) channel: int = chanid - hook_uri: str = _hook[0] - thread_id: int = _hook[1] - webhook: discord.Webhook = discord.Webhook.from_url(hook_uri, + (hook_uri, thread_id) = _hook + webhook = discord.Webhook.from_url(hook_uri, session=session) - thread: discord.Thread = self.bot.get_channel(channel)\ - .get_thread(thread_id) + _channel: Any = self.bot.get_channel(channel) + if not _channel: + return + thread = _channel.get_thread(thread_id) await webhook.send(f"**{comic_title}**", file=discord.File(comic_image, filename=f'img.{ext}'), username="Cyanide & Happiness", thread=thread) await asyncio.sleep(2) @@ -252,25 +271,28 @@ class Meme(commands.Cog): pass try: for comic in xkcd_comics: + if not comic: + continue (comic_title, comic_url) = comic - comic_title: str = discord.utils.escape_markdown(comic_title) + comic_title = discord.utils.escape_markdown(comic_title) comic_request = requests.get(comic_url, stream=True, timeout=(5, 20), headers=headers) comic_request.raise_for_status() - comic_content: bytes = comic_request.raw.read() - comic_image: io.BytesIO = io.BytesIO(comic_request.raw.read()) - ext: str = comic_url.split(".")[-1]\ + comic_content = comic_request.raw.read() + comic_image = io.BytesIO(comic_request.raw.read()) + ext = comic_url.split(".")[-1]\ .split("?")[0].split("&")[0] async with ClientSession() as session: - for chanid, _hook in self.THREADS.get('comic_xkcd').items(): - comic_image: io.BytesIO = io.BytesIO(comic_content) - channel: int = chanid - hook_uri: str = _hook[0] - thread_id: int = _hook[1] - webhook: discord.Webhook = discord.Webhook.from_url(hook_uri, + for chanid, _hook in self.THREADS.get('comic_xkcd', {}).items(): + comic_image = io.BytesIO(comic_content) + channel = chanid + (hook_uri, thread_id) = _hook + webhook = discord.Webhook.from_url(hook_uri, session=session) - thread: discord.Thread = self.bot.get_channel(channel)\ - .get_thread(thread_id) + _channel = self.bot.get_channel(channel) + if not _channel: + return + thread = _channel.get_thread(thread_id) await webhook.send(f"**{comic_title}**", file=discord.File(comic_image, filename=f'img.{ext}'), username="xkcd", thread=thread) await asyncio.sleep(2) @@ -278,24 +300,27 @@ class Meme(commands.Cog): pass try: for comic in smbc_comics: + if not comic: + continue (comic_title, comic_url) = comic - comic_title: str = discord.utils.escape_markdown(comic_title) + comic_title = discord.utils.escape_markdown(comic_title) comic_request = requests.get(comic_url, stream=True, timeout=(5, 20), headers=headers) comic_request.raise_for_status() - comic_content: bytes = comic_request.raw.read() - ext: str = comic_url.split(".")[-1]\ + comic_content = comic_request.raw.read() + ext = comic_url.split(".")[-1]\ .split("?")[0].split("&")[0] async with ClientSession() as session: - for chanid, _hook in self.THREADS.get('comic_smbc').items(): - comic_image: io.BytesIO = io.BytesIO(comic_content) - channel: int = chanid - hook_uri: str = _hook[0] - thread_id: int = _hook[1] - webhook: discord.Webhook = discord.Webhook.from_url(hook_uri, + for chanid, _hook in self.THREADS.get('comic_smbc', {}).items(): + comic_image = io.BytesIO(comic_content) + channel = chanid + (hook_uri, thread_id) = _hook + webhook = discord.Webhook.from_url(hook_uri, session=session) - thread = self.bot.get_channel(channel)\ - .get_thread(thread_id) + _channel = self.bot.get_channel(channel) + if not _channel: + return + thread = _channel.get_thread(thread_id) await webhook.send(f"**{comic_title}**", file=discord.File(comic_image, filename=f'img.{ext}'), username="SMBC", thread=thread) await asyncio.sleep(2) @@ -304,29 +329,32 @@ class Meme(commands.Cog): try: for comic in qc_comics: logging.debug("Trying QC...") + if not comic: + continue (comic_title, comic_url) = comic - comic_title: str = discord.utils.escape_markdown(comic_title) - comic_url: str = regex.sub(r'^http://ww\.', 'http://www.', + comic_title = discord.utils.escape_markdown(comic_title) + comic_url = regex.sub(r'^http://ww\.', 'http://www.', comic_url) - comic_url: str = regex.sub(r'\.pmg$', '.png', + comic_url = regex.sub(r'\.pmg$', '.png', comic_url) comic_request = requests.get(comic_url, stream=True, timeout=(5, 20), headers=headers) comic_request.raise_for_status() - comic_content: bytes = comic_request.raw.read() - ext: str = comic_url.split(".")[-1]\ + comic_content = comic_request.raw.read() + ext = comic_url.split(".")[-1]\ .split("?")[0].split("&")[0] async with ClientSession() as session: - for chanid, _hook in self.THREADS.get('comic_qc').items(): - comic_image: io.BytesIO = io.BytesIO(comic_content) - channel: int = chanid - hook_uri: str = _hook[0] - thread_id: int = _hook[1] - webhook: discord.Webhook = discord.Webhook.from_url(hook_uri, + for chanid, _hook in self.THREADS.get('comic_qc', {}).items(): + comic_image = io.BytesIO(comic_content) + channel = chanid + (hook_uri, thread_id) = _hook + webhook = discord.Webhook.from_url(hook_uri, session=session) - thread: discord.Thread = self.bot.get_channel(channel)\ - .get_thread(thread_id) + _channel = self.bot.get_channel(channel) + if not _channel: + return + thread = _channel.get_thread(thread_id) await webhook.send(f"**{comic_title}**", file=discord.File(comic_image, filename=f'img.{ext}'), username="Questionable Content", thread=thread) await asyncio.sleep(2) @@ -335,24 +363,27 @@ class Meme(commands.Cog): pass try: for comic in dino_comics: + if not comic: + continue (comic_title, comic_url) = comic - comic_title: str = discord.utils.escape_markdown(comic_title) + comic_title = discord.utils.escape_markdown(comic_title) comic_request = requests.get(comic_url, stream=True, timeout=(5, 20), headers=headers) comic_request.raise_for_status() - comic_content: bytes = comic_request.raw.read() + comic_content = comic_request.raw.read() ext = comic_url.split(".")[-1]\ .split("?")[0].split("&")[0] async with ClientSession() as session: - for chanid, _hook in self.THREADS.get('comic_dino').items(): - comic_image: io.BytesIO = io.BytesIO(comic_content) - channel: int = chanid - hook_uri: str = _hook[0] - thread_id: int = _hook[1] - webhook: discord.Webhook = discord.Webhook.from_url(hook_uri, + for chanid, _hook in self.THREADS.get('comic_dino', {}).items(): + comic_image = io.BytesIO(comic_content) + channel = chanid + (hook_uri, thread_id) = _hook + webhook = discord.Webhook.from_url(hook_uri, session=session) - thread: discord.Thread = self.bot.get_channel(channel)\ - .get_thread(thread_id) + _channel = self.bot.get_channel(channel) + if not _channel: + return + thread = _channel.get_thread(thread_id) await webhook.send(f"**{comic_title}**", file=discord.File(comic_image, filename=f'img.{ext}'), username="Dinosaur Comics", thread=thread) await asyncio.sleep(2) @@ -360,15 +391,17 @@ class Meme(commands.Cog): pass try: for onion in onions: + if not onion: + continue (onion_title, onion_description, onion_link, onion_video) = onion - onion_description: list[str] = textwrap.wrap(text=onion_description, + onion_description = textwrap.wrap(text=onion_description, width=860, max_lines=1)[0] embed: discord.Embed = discord.Embed(title=onion_title) embed.add_field(name="Content", value=f"{onion_description[0:960]}\n-# {onion_link}") async with ClientSession() as session: - for hook in self.NO_THREAD_WEBHOOKS.get('theonion'): - hook_uri: str = hook - webhook: discord.Webhook = discord.Webhook.from_url(hook_uri, + for hook in self.NO_THREAD_WEBHOOKS.get('theonion', {}): + hook_uri = hook + webhook = discord.Webhook.from_url(hook_uri, session=session) await webhook.send(embed=embed, username="The Onion") if onion_video: @@ -379,16 +412,18 @@ class Meme(commands.Cog): try: for thn in thns: logging.debug("Trying thn...") + if not thn: + continue (thn_title, thn_description, thn_link, thn_pubdate, thn_video) = thn - thn_description: list[str] = textwrap.wrap(text=thn_description, + thn_description = textwrap.wrap(text=thn_description, width=860, max_lines=1)[0] - embed: discord.Embed = discord.Embed(title=thn_title) + embed = discord.Embed(title=thn_title) embed.add_field(name="Content", value=f"{thn_description[0:960]}\n-# {thn_link}") embed.add_field(name="Published", value=thn_pubdate, inline=False) async with ClientSession() as session: - for hook in self.NO_THREAD_WEBHOOKS.get('thn'): - hook_uri: str = hook - webhook: discord.Webhook = discord.Webhook.from_url(hook_uri, + for hook in self.NO_THREAD_WEBHOOKS.get('thn', {}): + hook_uri = hook + webhook = discord.Webhook.from_url(hook_uri, session=session) await webhook.send(embed=embed, username="The Hacker News") if thn_video: @@ -422,11 +457,11 @@ class Meme(commands.Cog): except: traceback.print_exc() - @bridge.bridge_command() - @is_spamchan() # pylint: disable=too-many-function-args - async def meme(self, ctx) -> None: + @bridge.bridge_command() # type: ignore + @is_spamchan() + async def meme(self, ctx) -> None: """Create Meme""" - await ctx.respond(view=MemeView()) + await ctx.respond(view=MemeView()) @bridge.bridge_command(hidden=True) @commands.is_owner() diff --git a/cogs/misc.py b/cogs/misc.py index 28fcaa9..b9feeaa 100644 --- a/cogs/misc.py +++ b/cogs/misc.py @@ -12,6 +12,7 @@ from .misc_util import Util import aiosqlite as sqlite3 from sh import cowsay as cow_say, fortune # pylint: disable=no-name-in-module from discord.ext import bridge, commands, tasks +from disc_havoc import Havoc # pylint: disable=bare-except, broad-exception-caught, broad-exception-raised, global-statement # pylint: disable=too-many-lines, invalid-name @@ -25,8 +26,8 @@ BOT_CHANIDS = [] class Misc(commands.Cog): """Misc/Assorted Cog for Havoc""" - def __init__(self, bot): - self.bot: discord.Bot = bot + def __init__(self, bot: Havoc): + self.bot: Havoc = bot self.util = Util() self.COWS: list[str] = os.listdir(os.path.join("/", diff --git a/cogs/owner.py b/cogs/owner.py index ebd3ae6..f96d402 100644 --- a/cogs/owner.py +++ b/cogs/owner.py @@ -9,13 +9,14 @@ from typing import Optional import discord import requests from discord.ext import bridge, commands +from disc_havoc import Havoc import util class Owner(commands.Cog): """Owner Cog for Havoc""" - def __init__(self, bot) -> None: - self.bot: discord.Bot = bot - self.former_roles_store: dict = {} + def __init__(self, bot: Havoc) -> None: + self.bot: Havoc = bot + self.former_roles_store: dict[int, list[discord.Role]] = {} self._temperature: int = random.randrange(20, 30) @bridge.bridge_command(guild_ids=[1145182936002482196]) @@ -41,7 +42,7 @@ class Owner(commands.Cog): return await ctx.respond("Too cold! (-15°C minimum)") elif _temperature > 35: return await ctx.respond("Too hot! (35°C maximum)") - self._temperature: int = _temperature + self._temperature = _temperature return await ctx.respond(f"As per your request, I have adjusted the temperature to {_temperature} °C.") @bridge.bridge_command() @@ -54,6 +55,7 @@ class Owner(commands.Cog): Returns: None """ + self.bot.load_exts(False) await ctx.respond("Reloaded!", ephemeral=True) @@ -70,20 +72,18 @@ class Owner(commands.Cog): Returns: None """ - parameters: list[str] = parameters.split(" ") + _parameters: list[str] = parameters.split(" ") - if not len(parameters) > 1: + if not len(_parameters) > 1: return await ctx.respond("**Error**: Incorrect command usage; required: ", ephemeral=True) - channel: str = parameters[0] - channel_mentions: list[str] = discord.utils.raw_channel_mentions(channel) + channel: str = _parameters[0] + channel_mentions: list[int] = discord.utils.raw_channel_mentions(channel) if channel_mentions: - channel: str = str(channel_mentions[0]) + channel = str(channel_mentions[0]) msg: str = " ".join(parameters[1:]) - sent = await util.discord_helpers.send_message(self.bot, channel=channel, + await util.discord_helpers.send_message(self.bot, channel=channel, message=msg) - if not sent: - return await ctx.respond("**Failed.**", ephemeral=True) return await ctx.respond("**Done.**", ephemeral=True) @bridge.bridge_command() @@ -144,6 +144,8 @@ class Owner(commands.Cog): None """ try: + if not isinstance(message.channel, discord.TextChannel): + return memes_channel: discord.TextChannel = ctx.guild.get_channel(1147229098544988261) message_content: str = message.content message_author: str = message.author.display_name @@ -156,7 +158,9 @@ class Owner(commands.Cog): timeout=20).raw.read()) ext: str = item.url.split(".")[-1]\ .split("?")[0].split("&")[0] - _file: discord.File = discord.File(image, filename=f'img.{ext}') + _file = discord.File(image, filename=f'img.{ext}') + if not _file: + return # No file to move await memes_channel.send(f"*Performing bureaucratic duties (this didn't belong in #{message_channel})...*\n**{message_author}:** {message_content}", file=_file) await message.delete() await ctx.respond("OK!", ephemeral=True) @@ -176,6 +180,8 @@ class Owner(commands.Cog): None """ try: + if not isinstance(message.channel, discord.TextChannel): + return drugs_channel: discord.TextChannel = ctx.guild.get_channel(1172247451047034910) message_content: str = message.content message_author: str = message.author.display_name @@ -188,7 +194,9 @@ class Owner(commands.Cog): timeout=20).raw.read()) ext: str = item.url.split(".")[-1]\ .split("?")[0].split("&")[0] - _file: discord.File = discord.File(image, filename=f'img.{ext}') + _file = discord.File(image, filename=f'img.{ext}') + if not _file: + return # No file to move await drugs_channel.send(f"*Performing bureaucratic duties (this didn't belong in #{message_channel})...\ *\n**{message_author}:** {message_content}", file=_file) await message.delete() @@ -209,6 +217,8 @@ class Owner(commands.Cog): None """ try: + if not isinstance(message.channel, discord.TextChannel): + return funhouse_channel: discord.TextChannel = ctx.guild.get_channel(1213160512364478607) message_content: str = message.content message_author: str = message.author.display_name @@ -221,7 +231,7 @@ class Owner(commands.Cog): timeout=20).raw.read()) ext: str = item.url.split(".")[-1]\ .split("?")[0].split("&")[0] - _file: discord.File = discord.File(image, filename=f'img.{ext}') + _file = discord.File(image, filename=f'img.{ext}') await funhouse_channel.send(f"*Performing bureaucratic duties (this didn't belong in #{message_channel})\ ...*\n**{message_author}:** {message_content}") await message.delete() @@ -244,12 +254,14 @@ class Owner(commands.Cog): try: if not ctx.guild.id == 1145182936002482196: return # Not home server! + if not member.roles: + return # No roles audit_reason: str = f"Einsperren von {ctx.user.display_name}" - member: discord.Member = ctx.guild.get_member(member.id) + member = ctx.guild.get_member(member.id) member_display: str = member.display_name einsperren_role: discord.Role = ctx.guild.get_role(1235415059300093973) if ctx.guild.id != 1145182936002482196\ else ctx.guild.get_role(1235406301614309386) - member_roles: list[discord.Role] = [role for role in member.roles if not role.name == "@everyone"] + member_roles: list = [role for role in member.roles if not role.name == "@everyone"] member_role_names: list[str] = [str(role.name).lower() for role in member_roles] opers_chan: discord.TextChannel = ctx.guild.get_channel(1181416083287187546) if not "einsperren" in member_role_names: @@ -271,7 +283,7 @@ class Owner(commands.Cog): if not member.id in self.former_roles_store: await member.edit(roles=[]) # No roles else: - former_roles: list[discord.Role] = self.former_roles_store.get(member.id) + former_roles: list = self.former_roles_store.get(member.id, [0]) await member.edit(roles=former_roles, reason=f"De-{audit_reason}") await ctx.respond(f"{member_display} wurde von der Einsperre befreit.", ephemeral=True) diff --git a/cogs/quote.py b/cogs/quote.py index a38ddba..dd3b938 100644 --- a/cogs/quote.py +++ b/cogs/quote.py @@ -13,11 +13,12 @@ import asyncio import discord import aiosqlite as sqlite3 from discord.ext import bridge, commands +from disc_havoc import Havoc class DB: """DB Utility for Quote Cog""" - def __init__(self, bot): - self.bot = bot + def __init__(self, bot: Havoc): + self.bot: Havoc = bot self.db_path = os.path.join("/", "usr", "local", "share", "sqlite_dbs", "quotes.db") self.hp_chanid = 1157529874936909934 @@ -120,8 +121,8 @@ class DB: class Quote(commands.Cog): """Quote Cog for Havoc""" - def __init__(self, bot): - self.bot = bot + def __init__(self, bot: Havoc): + self.bot: Havoc = bot self.db = DB(self.bot) def is_homeserver(): # pylint: disable=no-method-argument diff --git a/cogs/radio.py b/cogs/radio.py index b13aeb4..c8c6b0c 100644 --- a/cogs/radio.py +++ b/cogs/radio.py @@ -7,12 +7,13 @@ from typing import Optional from discord.ext import bridge, commands, tasks from util.radio_util import get_now_playing import discord +from disc_havoc import Havoc class Radio(commands.Cog): """Radio Cog for Havoc""" - def __init__(self, bot: discord.Bot) -> None: - self.bot: discord.Bot = bot - self.channels: dict[tuple] = { + def __init__(self, bot: Havoc) -> None: + self.bot: Havoc = bot + self.channels: dict[str, tuple] = { 'sfm': (1145182936002482196, 1221615558492029050), # Tuple: Guild Id, Chan Id } self.STREAM_URL: str = "https://relay.sfm.codey.lol/aces.ogg" diff --git a/cogs/sing.py b/cogs/sing.py index 935684f..b2d26b4 100644 --- a/cogs/sing.py +++ b/cogs/sing.py @@ -10,20 +10,22 @@ import discord import regex from util.sing_util import Utility from discord.ext import bridge, commands +from disc_havoc import Havoc + BOT_CHANIDS = [] class Sing(commands.Cog): """Sing Cog for Havoc""" - def __init__(self, bot): - self.bot: discord.Bot = bot + def __init__(self, bot: Havoc) -> None: + self.bot: Havoc = bot self.utility = Utility() global BOT_CHANIDS BOT_CHANIDS = self.bot.BOT_CHANIDS # Inherit self.control_strip_regex: Pattern = regex.compile(r"\x0f|\x1f|\035|\002|\u2064|\x02|(\x03([0-9]{1,2}))|(\x03|\003)(?:\d{1,2}(?:,\d{1,2})?)?", regex.UNICODE) - def is_spamchan(): # pylint: disable=no-method-argument + def is_spamchan(): # type: ignore """Check if channel is spam chan""" def predicate(ctx): try: @@ -57,7 +59,7 @@ class Sing(commands.Cog): for _activity in ctx.author.activities: if _activity.type == discord.ActivityType.listening: - activity: discord.Activity = _activity + activity = _activity if not activity: return await ctx.respond("**Error**: No song specified, no activity found to read.") @@ -65,7 +67,9 @@ class Sing(commands.Cog): if interaction: await ctx.respond("*Searching...*", ephemeral=True) # Must respond to interactions within 3 seconds, per Discord - (search_artist, search_song, search_subsearch) = self.utility.parse_song_input(song, activity) + parsed = self.utility.parse_song_input(song, activity) + if isinstance(parsed, tuple): + (search_artist, search_song, search_subsearch) = parsed # await ctx.respond(f"So, {search_song} by {search_artist}? Subsearch: {search_subsearch} I will try...") # Commented, useful for debugging search_result: list[str] = await self.utility.lyric_search(search_artist, search_song, @@ -73,13 +77,16 @@ class Sing(commands.Cog): if len(search_result) == 1: return await ctx.respond(search_result[0].strip()) - - (search_result_artist, search_result_song, search_result_src, - search_result_confidence, search_result_time_taken) = search_result[0] # First index is a tuple + if not isinstance(search_result[0], tuple): + return # Invalid data type + ( + search_result_artist, search_result_song, search_result_src, + search_result_confidence, search_result_time_taken + ) = search_result[0] # First index is a tuple search_result_wrapped: list[str] = search_result[1] # Second index is the wrapped lyrics search_result_wrapped_short: list[str] = search_result[2] # Third is short wrapped lyrics if not ctx.channel.id in BOT_CHANIDS: - search_result_wrapped: list[str] = search_result_wrapped_short # Replace with shortened lyrics for non spamchans + search_result_wrapped = search_result_wrapped_short # Replace with shortened lyrics for non spamchans embeds: list[Optional[discord.Embed]] = [] embed_url: str = f"[on codey.lol](https://codey.lol/#{urllib.parse.quote(search_artist)}/{urllib.parse.quote(search_song)})" c: int = 0 @@ -87,8 +94,8 @@ class Sing(commands.Cog): for section in search_result_wrapped: c+=1 if c == len(search_result_wrapped): - footer: str = f"Found on: {search_result_src}" - section: str = self.control_strip_regex.sub('', section) + footer = f"Found on: {search_result_src}" + section = self.control_strip_regex.sub('', section) # if ctx.guild.id == 1145182936002482196: # section = section.upper() embed: discord.Embed = discord.Embed( @@ -131,7 +138,7 @@ class Sing(commands.Cog): activity: Optional[discord.Activity] = None for _activity in ctx.interaction.guild.get_member(member_id).activities: if _activity.type == discord.ActivityType.listening: - activity: discord.Activity = _activity + activity = _activity parsed: tuple|bool = self.utility.parse_song_input(song=None, activity=activity) if not parsed: @@ -139,43 +146,44 @@ class Sing(commands.Cog): if IS_SPAMCHAN: await ctx.respond(f"***Reading activity of {member_display}...***") - (search_artist, search_song, search_subsearch) = parsed - await ctx.respond("*Searching...*", ephemeral=True) # Must respond to interactions within 3 seconds, per Discord - search_result: list = await self.utility.lyric_search(search_artist, search_song, + if isinstance(parsed, tuple): + (search_artist, search_song, search_subsearch) = parsed + await ctx.respond("*Searching...*", ephemeral=True) # Must respond to interactions within 3 seconds, per Discord + search_result: list = await self.utility.lyric_search(search_artist, search_song, search_subsearch) - if len(search_result) == 1: - return await ctx.send(search_result[0].strip()) - - (search_result_artist, search_result_song, search_result_src, - search_result_confidence, search_result_time_taken) = search_result[0] # First index is a tuple - search_result_wrapped: list[str] = search_result[1] # Second index is the wrapped lyrics - search_result_wrapped_short: list[str] = search_result[2] # Third index is shortened lyrics - - if not IS_SPAMCHAN: - search_result_wrapped: list[str] = search_result_wrapped_short # Swap for shortened lyrics if not spam chan - - embeds: list[Optional[discord.Embed]] = [] - c: int = 0 - footer: str = "To be continued..." #Placeholder - for section in search_result_wrapped: - c+=1 - if c == len(search_result_wrapped): - footer: str = f"Found on: {search_result_src}" - # if ctx.guild.id == 1145182936002482196: - # section = section.upper() - embed: discord.Embed = discord.Embed( - title=f"{search_result_song} by {search_result_artist}", - description=discord.utils.escape_markdown(section.replace("\n", "\n\n")) - ) - embed.add_field(name="Confidence", value=search_result_confidence, inline=True) - embed.add_field(name="Time Taken", value=search_result_time_taken, inline=True) - embed.add_field(name="Link", value=f"[on codey.lol](https://codey.lol/#{urllib.parse.quote(search_result_artist)}/{urllib.parse.quote(search_result_song)})") - embed.set_footer(text=footer) - embeds.append(embed) + if len(search_result) == 1: + return await ctx.send(search_result[0].strip()) - for embed in embeds: - await ctx.send(embed=embed) + (search_result_artist, search_result_song, search_result_src, + search_result_confidence, search_result_time_taken) = search_result[0] # First index is a tuple + search_result_wrapped: list[str] = search_result[1] # Second index is the wrapped lyrics + search_result_wrapped_short: list[str] = search_result[2] # Third index is shortened lyrics + + if not IS_SPAMCHAN: + search_result_wrapped = search_result_wrapped_short # Swap for shortened lyrics if not spam chan + + embeds: list[Optional[discord.Embed]] = [] + c: int = 0 + footer: str = "To be continued..." #Placeholder + for section in search_result_wrapped: + c+=1 + if c == len(search_result_wrapped): + footer = f"Found on: {search_result_src}" + # if ctx.guild.id == 1145182936002482196: + # section = section.upper() + embed: discord.Embed = discord.Embed( + title=f"{search_result_song} by {search_result_artist}", + description=discord.utils.escape_markdown(section.replace("\n", "\n\n")) + ) + embed.add_field(name="Confidence", value=search_result_confidence, inline=True) + embed.add_field(name="Time Taken", value=search_result_time_taken, inline=True) + embed.add_field(name="Link", value=f"[on codey.lol](https://codey.lol/#{urllib.parse.quote(search_result_artist)}/{urllib.parse.quote(search_result_song)})") + embed.set_footer(text=footer) + embeds.append(embed) + + for _embed in embeds: + await ctx.send(embed=_embed) except Exception as e: traceback.print_exc() return await ctx.respond(f"ERR: {str(e)}") diff --git a/disc_havoc.py b/disc_havoc.py index 5d4e644..f6bed00 100644 --- a/disc_havoc.py +++ b/disc_havoc.py @@ -10,6 +10,7 @@ import setproctitle import hypercorn import hypercorn.asyncio from dotenv import load_dotenv +from asyncio import Task from discord.ext import bridge, commands from termcolor import colored from constants import OWNERS, BOT_CHANIDS @@ -38,50 +39,56 @@ load_dotenv() intents = discord.Intents.all() intents.message_content = True -bot = bridge.Bot(command_prefix=".", intents=intents, - owner_ids=OWNERS, activity=bot_activity, - help_command=commands.MinimalHelpCommand()) -@bot.event -async def on_ready() -> None: - """Run on Bot Ready""" - logging.info("%s online!", bot.user) +class Havoc(bridge.Bot): + def __init__(self) -> None: + super().__init__(command_prefix=".", intents=intents, + owner_ids=OWNERS, activity=bot_activity, + help_command=commands.MinimalHelpCommand()) + self.BOT_CHANIDS = BOT_CHANIDS + + def load_exts(self, initialRun: Optional[bool] = True) -> None: + """ + Load Cogs/Extensions + Args: + initialRun (Optional[bool]) default: True + Returns: + None""" + load_method = self.load_extension if initialRun\ + else self.reload_extension -def load_exts(initialRun: Optional[bool] = True) -> None: - """ - Load Cogs/Extensions - Args: - initialRun (Optional[bool]) default: True - Returns: - None""" - load_method = bot.load_extension if initialRun else bot.reload_extension + for cog in cogs_list: + logging.info("Loading: %s", cog) + load_method(f'cogs.{cog}') - for cog in cogs_list: - logging.info("Loading: %s", cog) - load_method(f'cogs.{cog}') + importlib.reload(api) + from api import API # pylint: disable=unused-import + api_config = hypercorn.config.Config() + api_config.bind = ["10.10.10.100:5992"] + api_instance = api.API(self) + try: + self.fapi_task.cancel() + except Exception as e: + logging.debug("Failed to cancel fapi_task: %s", + str(e)) + + logging.info("Starting FAPI Task") - importlib.reload(api) - from api import API # pylint: disable=unused-import - api_config = hypercorn.config.Config() - api_config.bind = "10.10.10.100:5992" - api_instance = api.API(bot) - try: - bot.fapi_task.cancel() - except Exception as e: - logging.debug("Failed to cancel fapi_task: %s", - str(e)) - - logging.info("Starting FAPI Task") - - bot.fapi_task = bot.loop.create_task(hypercorn.asyncio.serve(api_instance.api_app, + self.fapi_task: Task = self.loop.create_task(hypercorn.asyncio.serve(api_instance.api_app, api_config)) + + + @commands.Cog.listener() + async def on_ready(self) -> None: + """Run on Bot Ready""" + logging.info("%s online!", self.user) + + def __init__() -> None: logging.info(colored(f"Log level: {logging.getLevelName(logging.root.level)}", "red", attrs=['reverse'])) - bot.BOT_CHANIDS = BOT_CHANIDS - bot.load_exts = load_exts - bot.load_exts() + bot = Havoc() bot.run(os.getenv('TOKEN')) if __name__ == "__main__": diff --git a/jesusmemes.py b/jesusmemes.py new file mode 100644 index 0000000..1c30b87 --- /dev/null +++ b/jesusmemes.py @@ -0,0 +1,50 @@ +#!/usr/bin/env python3.11 + +# Jesus Meme Generator (requires Catbox uploader) + +import aiohttp # Not part of Python core +import asyncio # Part of Python core +import regex # Not part of Python core +import os # Part of Python core +import random # Part of Python core +import traceback +from urllib.parse import quote as urlquote +from catbox import Catbox # Not part of Python core + + +class JesusMemeGenerator(): + def __init__(self): + self.MEMEAPIURL = "https://apimeme.com/meme?meme=" + self.MEMESTORAGEDIR = os.path.join(os.path.expanduser("~"), "memes") # Save memes "temporarily" to the home directory-->'memes' subfolder; cleanup must be done externally + + async def create_meme(self, top_line='', bottom_line='', meme="Jesus-Talking-To-Cool-Dude"): + try: + top_line = regex.sub('[^\p{Letter}\p{Number}\p{Punctuation}\p{Horiz_Space}\p{Currency_Symbol}]', '', top_line.strip()) + bottom_line = regex.sub('[^\p{Letter}\p{Number}\p{Punctuation}\p{Horiz_Space}\p{Currency_Symbol}]', '', bottom_line.strip()) + OUT_FNAME = '' + + if len(top_line) < 1 or len(bottom_line) < 1: + return None + + formed_url = f"{self.MEMEAPIURL}{meme}&top={top_line.strip()}&bottom={bottom_line.strip()}" + formed_url = regex.sub('\p{Horiz_Space}', '+', regex.sub('#', '%23', formed_url.strip())) + timeout = aiohttp.ClientTimeout(total=15) + async with aiohttp.ClientSession(timeout=timeout) as session: + async with session.get(formed_url) as response: + UUID = f"{random.getrandbits(8)}-{random.getrandbits(8)}" + OUT_FNAME = f"{UUID}.jpg" + with open(f"{self.MEMESTORAGEDIR}/{OUT_FNAME}", 'wb') as f: + f.write(await response.read()) + + if len (OUT_FNAME) > 0: + uploader = Catbox() + meme_link = uploader.upload(f"{self.MEMESTORAGEDIR}/{OUT_FNAME}") + return meme_link + except: + print(traceback.format_exc()) + pass + return None + + + + diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..264cc2d --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,28 @@ +[project] +name = "havoc" +version = "0.1.0" +description = "Add your description here" +readme = "README.md" +requires-python = ">=3.11" +dependencies = [ + "aiosqlite>=0.20.0", + "beautifulsoup4>=4.12.3", + "bohancompliment>=0.2.0", + "fastapi>=0.115.7", + "feedparser>=6.0.11", + "hypercorn>=0.17.3", + "nvdlib>=0.7.9", + "openai>=1.60.0", + "py-cord[voice]>=2.6.1", + "pydantic>=2.10.6", + "python-dotenv>=1.0.1", + "pytz>=2024.2", + "regex>=2024.11.6", + "setproctitle>=1.3.4", + "sh>=2.2.1", + "termcolor>=2.5.0", + "websockets>=14.2", +] + +[tool.mypy] +disable_error_code = ["import-untyped"] \ No newline at end of file diff --git a/util/discord_helpers.py b/util/discord_helpers.py index 4ad9fd9..c8fa722 100644 --- a/util/discord_helpers.py +++ b/util/discord_helpers.py @@ -18,16 +18,19 @@ async def get_channel_by_name(bot: discord.Bot, channel: str, Returns: Optional[Any] """ - channel: str = re.sub(r'^#', '', channel.strip()) + channel = re.sub(r'^#', '', channel.strip()) if not guild: return discord.utils.get(bot.get_all_channels(), name=channel) else: - channels: list = bot.get_guild(guild).channels + _guild: Optional[discord.Guild] = bot.get_guild(guild) + if not _guild: + return None + channels: list = _guild.channels for _channel in channels: if _channel.name.lower() == channel.lower().strip(): return _channel - return + return None async def send_message(bot: discord.Bot, channel: str, message: str, guild: int | None = None) -> None: @@ -43,10 +46,12 @@ async def send_message(bot: discord.Bot, channel: str, None """ if channel.isnumeric(): - channel: int = int(channel) - _channel = bot.get_channel(channel) + channel_int: int = int(channel) + _channel = bot.get_channel(channel_int) else: - channel: str = re.sub(r'^#', '', channel.strip()) + channel = re.sub(r'^#', '', channel.strip()) _channel = await get_channel_by_name(bot=bot, channel=channel, guild=guild) + if not isinstance(_channel, discord.TextChannel): + return None await _channel.send(message)