commit 59caad4c744844948f9de0ade9dbb2bbfeb38872 Author: codey Date: Thu Feb 13 14:51:35 2025 -0500 initial push diff --git a/README.md b/README.md new file mode 100644 index 0000000..1a64dea --- /dev/null +++ b/README.md @@ -0,0 +1,3 @@ +![](https://codey.lol/havoc-fv.jpg) + +# Discord-Havoc Rewrite (Pycord) \ No newline at end of file diff --git a/api.py b/api.py new file mode 100644 index 0000000..43cc35e --- /dev/null +++ b/api.py @@ -0,0 +1,49 @@ +#!/usr/bin/env python3.12 + +import importlib +from typing import Optional +from fastapi import FastAPI, HTTPException +from pydantic import BaseModel +import util + +class ValidSendMsgRequest(BaseModel): + """ + - **guild**: optional, guild id in case multiple channels match (normally first result would be used) + - **channel**: channel to target + - **message**: message to send + """ + + guild: Optional[int] = None + channel: str + message: str + +class API: + """API [FastAPI Instance] for Havoc""" + def __init__(self, discord_bot): + api_app = FastAPI(title="Havoc API") + self.bot = discord_bot + self.api_app = api_app + + + @api_app.get("/{any:path}") + def block_get(): + raise HTTPException(status_code=403, detail="Invalid request") + + @api_app.post("/send_msg") + async def send_msg_handler(data: ValidSendMsgRequest): + await util.discord_helpers.send_message( + bot=self.bot, + guild=data.guild, + channel=data.channel, + message=data.message, + ) + return { + 'result': "presumed_success", + } + + +def __init__(): + import util # pylint: disable=redefined-outer-name, reimported, import-outside-toplevel + importlib.reload(util) + +__init__() \ No newline at end of file diff --git a/cogs/karma.py b/cogs/karma.py new file mode 100644 index 0000000..8069cb3 --- /dev/null +++ b/cogs/karma.py @@ -0,0 +1,308 @@ +#!/usr/bin/env python3.12 +# pylint: disable=broad-exception-caught, bare-except, invalid-name + +import sys +from os import path +sys.path.append( path.dirname( path.dirname( path.abspath(__file__) ) ) ) +import constants +import traceback +import time +import importlib +import logging +import discord +import regex +from typing import Pattern +from aiohttp import ClientSession, ClientTimeout +from discord.ext import bridge, commands, tasks + + + + +class Util: + """Karma Utility""" + def __init__(self, bot): + self.bot = bot + self.api_key: str = constants.PRV_API_KEY + self.karma_endpoints_base_url: str = "https://api.codey.lol/karma/" + self.karma_retrieval_url: str = f"{self.karma_endpoints_base_url}get" + self.karma_update_url: str = f"{self.karma_endpoints_base_url}modify" + self.karma_top_10_url: str = f"{self.karma_endpoints_base_url}top" + self.timers: dict = {} # discord uid : timestamp, used for rate limiting + self.karma_cooldown: int = 15 # 15 seconds between karma updates + + async def get_karma(self, keyword: str) -> int|str: + """ + Get Karma for Keyword + Args: + keyword (str) + Returns: + int|str + """ + try: + async with ClientSession() as session: + async with await session.post(self.karma_retrieval_url, + json={'keyword': keyword}, + headers={ + 'content-type': 'application/json; charset=utf-8', + 'X-Authd-With': f'Bearer {constants.KARMA_API_KEY}', + }, timeout=ClientTimeout(connect=3, sock_read=5)) as request: + resp = await request.json() + return resp.get('count') + except Exception as e: + traceback.print_exc() + return f"Failed-- {type(e).__name__}: {str(e)}" + + async def get_top(self, n: int = 10) -> dict: + """ + Get top (n=10) Karma + Args: + n (int): Number of top results to return, default 10 + Returns: + dict + """ + try: + async with ClientSession() as session: + async with await session.post(self.karma_top_10_url, + json = { + 'n': n, + }, + headers={ + 'content-type': 'application/json; charset=utf-8', + 'X-Authd-With': f'Bearer {constants.KARMA_API_KEY}' + }, timeout=ClientTimeout(connect=3, sock_read=5)) as request: + resp: dict = await request.json() + return resp + except: + traceback.print_exc() + + async def get_top_embed(self, n:int = 10) -> discord.Embed: + """ + Get Top Karma Embed + Args: + n (int): Number of top results to return, default 10 + Returns: + discord.Embed + """ + top: dict = await self.get_top(n) + top_formatted: str = "" + for x, item in enumerate(top): + top_formatted += f"{x+1}. **{discord.utils.escape_markdown(item[0])}**: *{item[1]}*\n" + top_formatted: str = top_formatted.strip() + embed: discord.Embed = discord.Embed(title=f"Top {n} Karma", + description=top_formatted, + colour=0xff00ff) + return embed + + async def update_karma(self, display: str, _id: int, keyword: str, flag: int) -> bool: + """ + Update Karma for Keyword + Args: + display (str): Display name of the user who requested the update + _id (int): Discord UID of the user who requested the update + keyword (str): Keyword to update + flag (int) + """ + if not flag in [0, 1]: + return + + reqObj: dict = { + 'granter': f"Discord: {display} ({_id})", + 'keyword': keyword, + 'flag': flag, + } + + try: + async with ClientSession() as session: + async with await session.post(self.karma_update_url, + json=reqObj, + headers={ + 'content-type': 'application/json; charset=utf-8', + 'X-Authd-With': f'Bearer {self.api_key}', + }, + timeout=ClientTimeout(connect=3, sock_read=5)) as request: + result = await request.json() + return result.get('success', False) + except: + traceback.print_exc() + return False + + + async def check_cooldown(self, user_id: int) -> bool: + """ + Check if member has met cooldown period prior to adjusting karma + Args: + user_id (int): The Discord UID to check + Returns: + bool + """ + if not user_id in self.timers: + return True + now = int(time.time()) + if (now - self.timers[user_id]) < self.karma_cooldown: + return False + return True + + + +class Karma(commands.Cog): + """Karma Cog for Havoc""" + def __init__(self, bot): + importlib.reload(constants) + self.bot = bot + self.util = Util(self.bot) + # self.karma_regex = regex.compile(r'(\w+)(\+\+|\-\-)') + self.karma_regex: Pattern = regex.compile(r'(\b\w+(?:\s+\w+)*)(\+\+($|\s)|\-\-($|\s))') + self.mention_regex: Pattern = regex.compile(r'(<@([0-9]{17,20})>)(\+\+|\-\-)') + self.mention_regex_no_flag: Pattern = regex.compile(r'(<@([0-9]{17,20})>+)') + self.karma_chanid: int = 1307065684785893406 + self.karma_msgid: int = 1325442184572567686 + + # asyncio.get_event_loop().create_task(self.bot.get_channel(self.karma_chanid).send(".")) + + try: + self.update_karma_chan.start() + except Exception as e: + pass + + + @tasks.loop(seconds=30, reconnect=True) + async def update_karma_chan(self): + """Update the Karma Chan Leaderboard""" + try: + top_embed = await self.util.get_top_embed(n=25) + channel = self.bot.get_channel(self.karma_chanid) + message_to_edit = await channel.fetch_message(self.karma_msgid) + await message_to_edit.edit(embed=top_embed, + content="## This message will automatically update periodically.") + except: + traceback.print_exc() + + + @commands.Cog.listener() + async def on_message(self, message: discord.Message): + """ + Message hook, to monitor for ++/-- + Also monitors for messages to #karma to autodelete, only Havoc may post in #karma! + """ + + if message.channel.id == self.karma_chanid and not message.author.id == self.bot.user.id: + """Message to #karma not by Havoc, delete it""" + await message.delete(reason="Messages to #karma are not allowed") + removal_embed: discord.Embed = discord.Embed( + title="Message Deleted", + description=f"Your message to **#{message.channel.name}** has been automatically deleted.\n**Reason**: Messages to this channel by users is not allowed." + ) + return await message.author.send(embed=removal_embed) + + + if message.author.id == self.bot.user.id: # Bots own message + return + + if not message.guild.id in [1145182936002482196, 1228740575235149855]: # Not a valid guild for cmd + return + + message_content: str = message.content.strip() + mentions: list = regex.findall(self.mention_regex, message_content) + + for mention in mentions: + try: + logging.debug("Mention: %s", mention) + mentioned_uid: int = int(mention[1]) + friendly_flag: int = int(mention[2]) + guild: discord.Guild = self.bot.get_guild(message.guild.id) + guild_member: discord.Member = guild.get_member(mentioned_uid) + display: str = guild_member.display_name + message_content: str = message_content.replace(mention[0], display) + logging.debug("New message: %s", message_content) + except: + traceback.print_exc() + + message_content: str = discord.utils.escape_markdown(message_content) + + karma_regex: list[str] = regex.findall(self.karma_regex, message_content.strip()) + if not karma_regex: # Not a request to adjust karma + return + + flooding: bool = not await self.util.check_cooldown(message.author.id) + exempt_uids: list[int] = [1172340700663255091, 992437729927376996] + if flooding and not message.author.id in exempt_uids: + return await message.add_reaction(emoji="❗") + + processed_keywords_lc: list[str] = [] + + logging.debug("Matched: %s", karma_regex) + + for matched_keyword in karma_regex: + if len(matched_keyword) == 4: + (keyword, friendly_flag, _, __) = matched_keyword + else: + (keyword, friendly_flag) = matched_keyword + now: int = int(time.time()) + + flag: int = None + match friendly_flag: + case "++": + flag: int = 0 + case "--": + flag: int = 1 + case _: + logging.info("Unknown flag %s", flag) + continue + + if keyword.lower() in processed_keywords_lc: + continue + + processed_keywords_lc.append(keyword.lower()) + + self.util.timers[message.author.id] = now + + updated: bool = await self.util.update_karma(message.author.display_name, + message.author.id, keyword, flag) + if updated: + return await message.add_reaction(emoji="👍") + + @bridge.bridge_command() + async def karma(self, ctx, *, keyword: str | None = None) -> None: + """With no arguments, top 10 karma is provided; a keyword can also be provided to lookup.""" + try: + if not keyword: + top_10_embed: discord.Embed = await self.util.get_top_embed() + return await ctx.respond(embed=top_10_embed) + + keyword: str = discord.utils.escape_markdown(keyword) + + mentions: list[str] = regex.findall(self.mention_regex_no_flag, keyword) + + for mention in mentions: + try: + mentioned_uid = int(mention[1]) + guild = self.bot.get_guild(ctx.guild.id) + guild_member = guild.get_member(mentioned_uid) + display = guild_member.display_name + keyword = keyword.replace(mention[0], display) + except: + traceback.print_exc() + continue + + score: int = await self.util.get_karma(keyword) + description: str = f"**{keyword}** has a karma of *{score}*" + if isinstance(score, dict) and score.get('err'): + description: str = f"*{score.get('errorText')}*" + embed: discord.Embed = discord.Embed(title=f"Karma for {keyword}", + description=description) + return await ctx.respond(embed=embed) + except Exception as e: + await ctx.respond(f"Error: {str(e)}") + traceback.print_exc() + + def cog_unload(self) -> None: + try: + self.update_karma_chan.cancel() + except: + """Safe to ignore""" + pass + + +def setup(bot) -> None: + """Run on Cog Load""" + bot.add_cog(Karma(bot)) diff --git a/cogs/lovehate.py b/cogs/lovehate.py new file mode 100644 index 0000000..f7818fa --- /dev/null +++ b/cogs/lovehate.py @@ -0,0 +1,250 @@ +#!/usr/bin/env python3.12 +# pylint: disable=broad-exception-caught + +import traceback +import logging +import os +from typing import Any, Optional +import discord +import aiosqlite as sqlite3 +from discord.ext import bridge, commands +from util.lovehate_db import DB +from constructors import LoveHateException + + +class LoveHate(commands.Cog): + """LoveHate Cog for Havoc""" + def __init__(self, bot): + self.bot: discord.Bot = bot + self.db = DB(self.bot) + + def join_with_and(self, items: list) -> str: + """ + Join list with and added before last item + Args: + items (list) + Returns: + str + """ + if len(items) > 1: + return ', '.join(items[:-1]) + ' and ' + items[-1] + return items[0] if items else '' + + + @bridge.bridge_command() + async def loves(self, ctx, user: Optional[str] = None) -> None: + """ + If keyword isn't provided, returns the things YOU love; specify a user to find what THEY love. + Args: + ctx (Any) + user (Optional[str]) + Returns: + None + """ + try: + if not user: + loves: list[tuple] = await self.db.get_lovehates(user=ctx.author.display_name, loves=True) + if not loves: + return await ctx.respond("You don't seem to love anything...") + + out_loves: list = [] + for love in loves: + (love,) = love + out_loves.append(love) + + out_loves_str: str = self.join_with_and(out_loves) + return await ctx.respond(f"{ctx.author.mention} loves {out_loves_str}") + + loves: list[tuple] = await self.db.get_lovehates(user=user.strip(), loves=True) + if not loves: + return await ctx.respond(f"{user} doesn't seem to love anything...") + + out_loves_str: str = self.join_with_and(out_loves) + return await ctx.respond(f"{user} loves {out_loves_str}") + except Exception as e: + traceback.print_exc() + return await ctx.respond(f"Error: {str(e)}") + + @bridge.bridge_command() + async def wholoves(self, ctx, *, thing: Optional[str] = None) -> None: + """ + Check who loves + Args: + ctx (Any) + thing (Optional[str]) + Returns: + None + """ + try: + if not thing: + thing: str = ctx.author.display_name + if discord.utils.raw_mentions(thing): + # There are mentions + thing_id: int = discord.utils.raw_mentions(thing)[0] # First mention + thing: str = self.bot.get_guild(ctx.guild.id).get_member(thing_id).display_name + + who_loves: list[tuple] = await self.db.get_wholovehates(thing=thing, + loves=True) + if not who_loves: + return await ctx.respond(f"I couldn't find anyone who loves {thing}...") + + out_wholoves: list = [] + for lover in who_loves: + (lover,) = lover + out_wholoves.append(str(lover)) + + optional_s: str = "s" if len(out_wholoves) == 1 else "" + + out_wholoves_str: str = self.join_with_and(out_wholoves) + + return await ctx.respond(f"{out_wholoves_str} love{optional_s} {thing}") + except Exception as e: + traceback.print_exc() + return await ctx.respond(f"Error: {str(e)}") + + @bridge.bridge_command() + async def whohates(self, ctx, *, thing: Optional[str] = None) -> None: + """ + Check who hates + Args: + ctx (Any) + thing (Optional[str]) + Returns: + None + """ + try: + if not thing: + thing: str = ctx.author.display_name + if discord.utils.raw_mentions(thing): + # There are mentions + thing_id: int = discord.utils.raw_mentions(thing)[0] # First mention + thing: str = self.bot.get_guild(ctx.guild.id).get_member(thing_id).display_name + + who_hates: list[tuple] = await self.db.get_wholovehates(thing=thing, + hates=True) + if not who_hates: + return await ctx.respond(f"I couldn't find anyone who hates {thing}...") + + out_whohates: list = [] + for hater in who_hates: + (hater,) = hater + out_whohates.append(str(hater)) + + optional_s: str = "s" if len(out_whohates) == 1 else "" + + out_whohates_str: str = self.join_with_and(out_whohates) + + return await ctx.respond(f"{out_whohates_str} hate{optional_s} {thing}") + except Exception as e: + traceback.print_exc() + return await ctx.respond(f"Error: {str(e)}") + + + @bridge.bridge_command() + async def dontcare(self, ctx, thing: str) -> None: + """ + Make me forget your opinion on + Args: + ctx (Any) + thing (str) + Returns: + None + """ + try: + stop_caring: str = await self.db.update(ctx.author.display_name, + thing, 0) + return await ctx.respond(stop_caring) + except Exception as e: + await ctx.respond(f"Error: {str(e)}") + traceback.print_exc() + + @bridge.bridge_command() + async def hates(self, ctx, user: Optional[str] = None) -> None: + """ + If keyword isn't provided, returns the things YOU hate; specify a user to find what THEY hate. + Args: + ctx (Any) + user (Optional[str]) + Returns: + None + """ + try: + if not user: + hates: list[tuple] = await self.db.get_lovehates(user=ctx.author.display_name, + hates=True) + if not hates: + return await ctx.respond("You don't seem to hate anything...") + + out_hates: list = [] + for hated_thing in hates: + (hated_thing,) = hated_thing + out_hates.append(str(hated_thing)) + + out_hates_str: str = self.join_with_and(out_hates) + + return await ctx.respond(f"{ctx.author.mention} hates {out_hates_str}") + + hates: list[tuple] = await self.db.get_lovehates(user=user.strip(), hates=True) + if not hates: + return await ctx.respond(f"{user} doesn't seem to hate anything...") + + out_hates_str: str = self.join_with_and(hates) + + return await ctx.respond(f"{user} hates {out_hates_str}") + except Exception as e: + await ctx.respond(f"Error: {str(e)}") + traceback.print_exc() + + @bridge.bridge_command(aliases=['sarcastichate']) + async def love(self, ctx, *, thing: str) -> None: + """ + Love + Args: + ctx (Any) + thing (str) + Returns: + None + """ + try: + if discord.utils.raw_mentions(thing): + # There are mentions + thing_id: int = discord.utils.raw_mentions(thing)[0] # First mention + thing: str = self.bot.get_guild(ctx.guild.id).get_member(thing_id).display_name + + love: str = await self.db.update(ctx.author.display_name, + thing, 1) + return await ctx.respond(love) + except Exception as e: + await ctx.respond(f"Error: {str(e)}") + traceback.print_exc() + + @bridge.bridge_command(aliases=['sarcasticlove']) + async def hate(self, ctx, *, thing: str) -> None: + """ + Hate + Args: + ctx (Any) + thing (str) + Returns: + None + """ + try: + if discord.utils.raw_mentions(thing): + # There are mentions + thing_id: int = discord.utils.raw_mentions(thing)[0] # First mention + thing: str = self.bot.get_guild(ctx.guild.id).get_member(thing_id).display_name + hate: str = await self.db.update(ctx.author.display_name, + thing, -1) + return await ctx.respond(hate) + except Exception as e: + await ctx.respond(f"Error: {str(e)}") + traceback.print_exc() + + def cog_unload(self) -> None: + # not needed currently + pass + + +def setup(bot) -> None: + """Run on Cog Load""" + bot.add_cog(LoveHate(bot)) \ No newline at end of file diff --git a/cogs/meme.py b/cogs/meme.py new file mode 100644 index 0000000..4a4165a --- /dev/null +++ b/cogs/meme.py @@ -0,0 +1,458 @@ +#!/usr/bin/env python3.12 + +import os +import traceback +import json +import io +import asyncio +import random +from typing import LiteralString, Optional +import logging +import textwrap +import regex +import requests +import discord +from aiohttp import ClientSession +from discord.ext import bridge, commands, tasks +from jesusmemes import JesusMemeGenerator +import scrapers.reddit_scrape as memeg +import scrapers.explosm_scrape as explosmg +import scrapers.xkcd_scrape as xkcdg +import scrapers.smbc_scrape as smbcg +import scrapers.qc_scrape as qcg +import scrapers.dinosaur_scrape as dinog +import scrapers.onion_scrape as oniong +import scrapers.thn_scrape as thng +import constants +# pylint: disable=global-statement, bare-except, invalid-name, line-too-long + +meme_choices = [] +BOT_CHANIDS = [] + +class Helper: + """Meme Helper""" + def load_meme_choices(self) -> None: + """Load Available Meme Templates from JSON File""" + global meme_choices + + memes_file: str|LiteralString = os.path.join(os.path.dirname(__file__), "memes.json") + with open(memes_file, 'r', encoding='utf-8') as f: + meme_choices = json.loads(f.read()) + +class MemeView(discord.ui.View): + """Meme Selection discord.ui.View""" + helper = Helper() + helper.load_meme_choices() + @discord.ui.select( + placeholder = "Choose a Meme!", + min_values = 1, + max_values = 1, + options = [ + discord.SelectOption( + label=meme_label + ) for meme_label in meme_choices[0:24] + ] + ) + async def select_callback(self, select: discord.ui.Select, + interaction: discord.Interaction) -> None: + """Meme Selection Callback""" + modal: discord.ui.Modal = MemeModal(meme=select.values[0], title="Meme Selected") + await interaction.response.send_modal(modal) + +class MemeModal(discord.ui.Modal): + """Meme Creation discord.ui.Modal""" + def __init__(self, *args, meme: Optional[str] = None, **kwargs) -> None: + super().__init__(*args, **kwargs) + self.selected_meme: str = meme + self.meme_generator = JesusMemeGenerator() + self.TEXT_LIMIT: int = 80 + + self.add_item(discord.ui.InputText(label="Top Text", + style=discord.InputTextStyle.singleline)) + self.add_item(discord.ui.InputText(label="Bottom Text", + style=discord.InputTextStyle.singleline)) + + async def callback(self, interaction: discord.Interaction) -> None: + selected_meme: str = self.selected_meme + meme_top_line: str = self.children[0].value.strip() + meme_bottom_line: str = self.children[1].value.strip() + if len(meme_top_line) > self.TEXT_LIMIT or len(meme_bottom_line) > self.TEXT_LIMIT: + await interaction.response.send_message("ERR: Text is limited to 80 characters for each the top and bottom lines.") + return + + meme_link: str = await self.meme_generator.create_meme(top_line=meme_top_line, bottom_line=meme_bottom_line, meme=selected_meme) + + embed: discord.Embed = discord.Embed(title="Generated Meme") + embed.set_image(url=meme_link) + embed.add_field(name="Meme", value=self.selected_meme, inline=True) + await interaction.response.send_message(embeds=[embed]) + return + +class Meme(commands.Cog): + """Meme Cog for Havoc""" + + def __init__(self, bot) -> None: + self.bot: discord.Bot = bot + self.meme_choices: list = [] + self.meme_counter: int = 0 + self.THREADS: dict[dict[list]] = { + # Format: Guild1: [ChanId : [Webhook, ThreadId], Guild2: [ChanId : [Webhook, ThreadId] + 'comic_explosm': { + 1298729744216359055: [constants.EXPLOSM_WEBHOOK, 1299165855493390367], + 1306414795049926676: [constants.EXPLOSM_WEBHOOK2, 1306416492304138364], + }, + 'comic_xkcd': { + 1298729744216359055: [constants.XKCD_WEBHOOK, 1299165928755433483], + 1306414795049926676: [constants.XKCD_WEBHOOK2, 1306416681991798854], + }, + 'comic_smbc': { + 1298729744216359055: [constants.SMBC_WEBHOOK, 1299166071038808104], + 1306414795049926676: [constants.SMBC_WEBHOOK2, 1306416842511745024], + }, + 'comic_qc': { + 1298729744216359055: [constants.QC_WEBHOOK, 1299392115364593674], + 1306414795049926676: [constants.QC_WEBHOOK2, 1306417084774744114], + }, + 'comic_dino': { + 1298729744216359055: [constants.DINO_WEBHOOK, 1299771918886506557], + 1306414795049926676: [constants.DINO_WEBHOOK2, 1306417286713704548], + } + } + + self.NO_THREAD_WEBHOOKS: dict[list] = { + 'theonion': [constants.ONION_WEBHOOK, constants.ONION_WEBHOOK2], + 'thn': [constants.THN_WEBHOOK], + 'memes': [constants.MEME_WEBHOOK1, constants.MEME_WEBHOOK2], + } + + global BOT_CHANIDS + BOT_CHANIDS = self.bot.BOT_CHANIDS # Inherit + + self.meme_stream_loop.start() + self.explosm_loop.start() + + def is_spamchan() -> bool: # pylint: disable=no-method-argument + """Check if channel is spamchan""" + def predicate(ctx): + try: + if not ctx.channel.id in BOT_CHANIDS: + logging.debug("%s not found in %s", ctx.channel.id, BOT_CHANIDS) + return ctx.channel.id in BOT_CHANIDS + except: + traceback.print_exc() + return False + return commands.check(predicate) + + + + async def do_autos(self, only_comics: Optional[bool] = False) -> None: + """ + Run Auto Posters + Args: + only_comics (Optional[bool]): default False + Returns: + None + """ + try: + meme_grabber = memeg.MemeGrabber() + explosm_grabber = explosmg.ExplosmGrabber() + xkcd_grabber = xkcdg.XKCDGrabber() + smbc_grabber = smbcg.SMBCGrabber() + qc_grabber = qcg.QCGrabber() + dino_grabber = dinog.DinosaurGrabber() + onion_grabber = oniong.OnionGrabber() + thn_grabber = thng.THNGrabber() + explosm_comics = xkcd_comics = smbc_comics = qc_comics = dino_comics\ + = onions = thns = [] + memes: list[tuple] = await meme_grabber.get() + try: + try: + explosm_comics: list[tuple] = await explosm_grabber.get() + except: + pass + try: + xkcd_comics: list[tuple] = await xkcd_grabber.get() + except: + pass + try: + smbc_comics: list[tuple] = await smbc_grabber.get() + except: + pass + try: + qc_comics: list[tuple] = await qc_grabber.get() + print(f"QC: {qc_comics}") + except: + pass + try: + dino_comics: list[tuple] = await dino_grabber.get() + except Exception as e: + logging.debug("Dino failed: %s", str(e)) + pass + try: + onions: list[tuple] = await onion_grabber.get() + except Exception as e: + logging.debug("Onion failed: %s", str(e)) + pass + try: + thns: list[tuple] = await thn_grabber.get() + except Exception as e: + logging.debug("THNs failed: %s", str(e)) + pass + except: + traceback.print_exc() + agents: list[str] = constants.HTTP_UA_LIST + headers: dict = { + 'User-Agent': random.choice(agents) + } + if not only_comics: + try: + for meme in memes: + (meme_id, meme_title, meme_url) = meme # pylint: disable=unused-variable + request = requests.get(meme_url, stream=True, timeout=(5, 30), headers=headers) + if not request.status_code == 200: + continue + meme_content: bytes = request.raw.read() + for meme_hook in self.NO_THREAD_WEBHOOKS.get('memes'): + meme_image: io.BytesIO = io.BytesIO(meme_content) + ext: str = meme_url.split(".")[-1]\ + .split("?")[0].split("&")[0] + async with ClientSession() as session: + webhook: discord.Webhook = discord.Webhook.from_url(meme_hook, + session=session) + await webhook.send(file=discord.File(meme_image, + filename=f'img.{ext}'), username="r/memes") + await asyncio.sleep(2) + except: + pass + try: + for comic in explosm_comics: + (comic_title, comic_url) = comic + comic_title: str = discord.utils.escape_markdown(comic_title) + comic_request = requests.get(comic_url, stream=True, timeout=(5, 20), headers=headers) + comic_request.raise_for_status() + comic_content: bytes = comic_request.raw.read() + ext: str = comic_url.split(".")[-1]\ + .split("?")[0].split("&")[0] + + async with ClientSession() as session: + for chanid, _hook in self.THREADS.get('comic_explosm').items(): + comic_image: io.BytesIO = io.BytesIO(comic_content) + channel: int = chanid + hook_uri: str = _hook[0] + thread_id: int = _hook[1] + webhook: discord.Webhook = discord.Webhook.from_url(hook_uri, + session=session) + thread: discord.Thread = self.bot.get_channel(channel)\ + .get_thread(thread_id) + await webhook.send(f"**{comic_title}**", file=discord.File(comic_image, filename=f'img.{ext}'), + username="Cyanide & Happiness", thread=thread) + await asyncio.sleep(2) + except: + pass + try: + for comic in xkcd_comics: + (comic_title, comic_url) = comic + comic_title: str = discord.utils.escape_markdown(comic_title) + comic_request = requests.get(comic_url, stream=True, timeout=(5, 20), headers=headers) + comic_request.raise_for_status() + comic_content: bytes = comic_request.raw.read() + comic_image: io.BytesIO = io.BytesIO(comic_request.raw.read()) + ext: str = comic_url.split(".")[-1]\ + .split("?")[0].split("&")[0] + + async with ClientSession() as session: + for chanid, _hook in self.THREADS.get('comic_xkcd').items(): + comic_image: io.BytesIO = io.BytesIO(comic_content) + channel: int = chanid + hook_uri: str = _hook[0] + thread_id: int = _hook[1] + webhook: discord.Webhook = discord.Webhook.from_url(hook_uri, + session=session) + thread: discord.Thread = self.bot.get_channel(channel)\ + .get_thread(thread_id) + await webhook.send(f"**{comic_title}**", file=discord.File(comic_image, filename=f'img.{ext}'), + username="xkcd", thread=thread) + await asyncio.sleep(2) + except: + pass + try: + for comic in smbc_comics: + (comic_title, comic_url) = comic + comic_title: str = discord.utils.escape_markdown(comic_title) + comic_request = requests.get(comic_url, stream=True, timeout=(5, 20), headers=headers) + comic_request.raise_for_status() + comic_content: bytes = comic_request.raw.read() + ext: str = comic_url.split(".")[-1]\ + .split("?")[0].split("&")[0] + + async with ClientSession() as session: + for chanid, _hook in self.THREADS.get('comic_smbc').items(): + comic_image: io.BytesIO = io.BytesIO(comic_content) + channel: int = chanid + hook_uri: str = _hook[0] + thread_id: int = _hook[1] + webhook: discord.Webhook = discord.Webhook.from_url(hook_uri, + session=session) + thread = self.bot.get_channel(channel)\ + .get_thread(thread_id) + await webhook.send(f"**{comic_title}**", file=discord.File(comic_image, filename=f'img.{ext}'), + username="SMBC", thread=thread) + await asyncio.sleep(2) + except: + pass + try: + for comic in qc_comics: + logging.debug("Trying QC...") + (comic_title, comic_url) = comic + comic_title: str = discord.utils.escape_markdown(comic_title) + comic_url: str = regex.sub(r'^http://ww\.', 'http://www.', + comic_url) + comic_url: str = regex.sub(r'\.pmg$', '.png', + comic_url) + comic_request = requests.get(comic_url, stream=True, + timeout=(5, 20), headers=headers) + comic_request.raise_for_status() + comic_content: bytes = comic_request.raw.read() + ext: str = comic_url.split(".")[-1]\ + .split("?")[0].split("&")[0] + + async with ClientSession() as session: + for chanid, _hook in self.THREADS.get('comic_qc').items(): + comic_image: io.BytesIO = io.BytesIO(comic_content) + channel: int = chanid + hook_uri: str = _hook[0] + thread_id: int = _hook[1] + webhook: discord.Webhook = discord.Webhook.from_url(hook_uri, + session=session) + thread: discord.Thread = self.bot.get_channel(channel)\ + .get_thread(thread_id) + await webhook.send(f"**{comic_title}**", file=discord.File(comic_image, filename=f'img.{ext}'), + username="Questionable Content", thread=thread) + await asyncio.sleep(2) + except: + traceback.print_exc() + pass + try: + for comic in dino_comics: + (comic_title, comic_url) = comic + comic_title: str = discord.utils.escape_markdown(comic_title) + comic_request = requests.get(comic_url, stream=True, timeout=(5, 20), headers=headers) + comic_request.raise_for_status() + comic_content: bytes = comic_request.raw.read() + ext = comic_url.split(".")[-1]\ + .split("?")[0].split("&")[0] + + async with ClientSession() as session: + for chanid, _hook in self.THREADS.get('comic_dino').items(): + comic_image: io.BytesIO = io.BytesIO(comic_content) + channel: int = chanid + hook_uri: str = _hook[0] + thread_id: int = _hook[1] + webhook: discord.Webhook = discord.Webhook.from_url(hook_uri, + session=session) + thread: discord.Thread = self.bot.get_channel(channel)\ + .get_thread(thread_id) + await webhook.send(f"**{comic_title}**", file=discord.File(comic_image, filename=f'img.{ext}'), + username="Dinosaur Comics", thread=thread) + await asyncio.sleep(2) + except: + pass + try: + for onion in onions: + (onion_title, onion_description, onion_link, onion_video) = onion + onion_description: list[str] = textwrap.wrap(text=onion_description, + width=860, max_lines=1)[0] + embed: discord.Embed = discord.Embed(title=onion_title) + embed.add_field(name="Content", value=f"{onion_description[0:960]}\n-# {onion_link}") + async with ClientSession() as session: + for hook in self.NO_THREAD_WEBHOOKS.get('theonion'): + hook_uri: str = hook + webhook: discord.Webhook = discord.Webhook.from_url(hook_uri, + session=session) + await webhook.send(embed=embed, username="The Onion") + if onion_video: + await webhook.send(f"^ video: {onion_video}") + await asyncio.sleep(2) + except: + pass + try: + for thn in thns: + logging.debug("Trying thn...") + (thn_title, thn_description, thn_link, thn_pubdate, thn_video) = thn + thn_description: list[str] = textwrap.wrap(text=thn_description, + width=860, max_lines=1)[0] + embed: discord.Embed = discord.Embed(title=thn_title) + embed.add_field(name="Content", value=f"{thn_description[0:960]}\n-# {thn_link}") + embed.add_field(name="Published", value=thn_pubdate, inline=False) + async with ClientSession() as session: + for hook in self.NO_THREAD_WEBHOOKS.get('thn'): + hook_uri: str = hook + webhook: discord.Webhook = discord.Webhook.from_url(hook_uri, + session=session) + await webhook.send(embed=embed, username="The Hacker News") + if thn_video: + await webhook.send(f"^ video: {thn_video}") + await asyncio.sleep(2) + except: + pass + except: + # await self.bot.get_channel(self.MEMESTREAM_CHANID).send(f"FUCK, MY MEEMER! YOU DENTED MY MEEMER!") + traceback.print_exc() + + @tasks.loop(hours=12.0) + async def meme_stream_loop(self) -> None: + """Meme Stream Loop (r/memes)""" + try: + await asyncio.sleep(10) # Try to ensure we are ready first + self.meme_counter += 1 + if self.meme_counter == 1: + return await self.do_autos(only_comics=True) # Skip first iteration! + + await self.do_autos() + except: + traceback.print_exc() + + @tasks.loop(hours=0.5) + async def explosm_loop(self) -> None: + """Comic Loop""" + try: + await asyncio.sleep(10) # Try to ensure we are ready first + await self.do_autos(only_comics=True) + except: + traceback.print_exc() + + @bridge.bridge_command() + @is_spamchan() # pylint: disable=too-many-function-args + async def meme(self, ctx) -> None: + """Create Meme""" + await ctx.respond(view=MemeView()) + + @bridge.bridge_command(hidden=True) + @commands.is_owner() + async def domemestream(self, ctx) -> None: + """Run Meme Stream Auto Post""" + try: + await ctx.respond("Trying!", ephemeral=True) + await self.do_autos() + except: + await ctx.respond("Fuck! :(", ephemeral=True) + traceback.print_exc() + + @bridge.bridge_command(hidden=True) + @commands.is_owner() + async def doexplosm(self, ctx) -> None: + """Run Comic Auto Posters""" + try: + await ctx.respond("Trying!", ephemeral=True) + await self.do_autos(only_comics=True) + except: + await ctx.respond("Fuck! :(", ephemeral=True) + traceback.print_exc() + + def cog_unload(self) -> None: + self.meme_stream_loop.cancel() + self.explosm_loop.cancel() + +def setup(bot) -> None: + """Run on Cog Load""" + bot.add_cog(Meme(bot)) \ No newline at end of file diff --git a/cogs/memes.json b/cogs/memes.json new file mode 100644 index 0000000..23b56b7 --- /dev/null +++ b/cogs/memes.json @@ -0,0 +1,14 @@ +[ + "10-Guy", + "1990s-First-World-Problems", + "Bill-Nye-The-Science-Guy", + "Dont-You-Squidward", + "Grumpy-Cat", + "I-Was-Told-There-Would-Be", + "I-Know-Fuck-Me-Right", + "Putin", + "Michael-Jackson-Popcorn", + "Peter-Griffin-News", + "Shut-Up-And-Take-My-Money-Fry", + "Relaxed-Office-Guy" +] \ No newline at end of file diff --git a/cogs/misc.py b/cogs/misc.py new file mode 100644 index 0000000..ed3e08a --- /dev/null +++ b/cogs/misc.py @@ -0,0 +1,1253 @@ +#!/usr/bin/env python3.12 + +import os +import traceback +import urllib +import datetime +import random +from typing import Optional, LiteralString +import logging +import discord +from .misc_util import Util +import aiosqlite as sqlite3 +from sh import cowsay as cow_say, fortune # pylint: disable=no-name-in-module +from discord.ext import bridge, commands, tasks +# pylint: disable=bare-except, broad-exception-caught, broad-exception-raised, global-statement +# pylint: disable=too-many-lines, invalid-name + +""" +This plugin encompasses numerous tiny commands/functions that +do not necessitate their own cogs +""" + +DRUGS_CHANID = 1172247451047034910 +BOT_CHANIDS = [] + +class Misc(commands.Cog): + """Misc/Assorted Cog for Havoc""" + def __init__(self, bot): + self.bot: discord.Bot = bot + self.util = Util() + + self.COWS: list[str] = os.listdir(os.path.join("/", + "usr", + "share", + "cowsay", + "cows")) + self.FATES: list[str] = ['into a pile of white dog shit, face first', + 'onto the floor', + 'into a volcano', + 'into a toaster bath', + 'into a pit of venomous snakes', + 'into oncoming traffic', + 'off a bridge', + 'into the roaring 20\'s', + 'into an unknown orifice', + 'into the large hadron collider', + 'into an awkward nude group hug', + 'into a swinger party', + 'into an adoption agency, because even their parents didn\'t want them', + 'into the gas chamber for a shower', + 'into a tub of Jello', + 'into a trap full of Devils Snare', + 'down the stairs', + 'into Uranus', + 'into an enthralling and extended conversation about berries', + 'directly into Mordor', + 'into a giant mousetrap', + 'into a room full of exploding balloons', + 'into a giant blender', + 'into a giant microwave', + ] + self.DRUGS_CHANID = DRUGS_CHANID + global BOT_CHANIDS + BOT_CHANIDS = self.bot.BOT_CHANIDS + + def is_spamchan() -> bool: # pylint: disable=no-method-argument + """Check if channel is spamchan""" + def predicate(ctx): + try: + if not ctx.channel.id in BOT_CHANIDS: + logging.debug("%s not found in %s", ctx.channel.id, BOT_CHANIDS) + return ctx.channel.id in BOT_CHANIDS + except: + traceback.print_exc() + return False + return commands.check(predicate) + + def is_spamchan_or_drugs() -> bool: # pylint: disable=no-method-argument + """Check if channel is spamchan or drugs chan""" + def predicate(ctx): + try: + if not ctx.channel.id in BOT_CHANIDS and not ctx.channel.id == DRUGS_CHANID: + logging.debug("%s not found in %s and it isnt %s", ctx.channel.id, BOT_CHANIDS, DRUGS_CHANID) + return ctx.channel.id in BOT_CHANIDS or ctx.channel.id == DRUGS_CHANID + except: + traceback.print_exc() + return False + return commands.check(predicate) + + + async def get_random_guild_member(self, online_only: Optional[bool] = False) -> str: + """ + Get Random Guild Member + Args: + online_only (Optional[bool]) + Returns: + str + """ + guild = self.bot.get_guild(1145182936002482196) + if not online_only: + guild_members = [str(member.display_name) for member in guild.members if not member.bot] + else: + guild_members = [str(member.display_name) for member in guild.members if not member.bot and member.status in [ + discord.Status.online, discord.Status.idle]] + return random.choice(guild_members) + + @bridge.bridge_command() + async def stats(self, ctx) -> None: + """ + Get Stats + Args: + ctx (Any) + Returns: + None + """ + try: + stats_embed: discord.Embed = await self.util.get_stats_embed() + return await ctx.respond(embed=stats_embed) + except Exception as e: + traceback.print_exc() + return await ctx.respond(f"Error: {str(e)}") + + @bridge.bridge_command() + @is_spamchan_or_drugs() # pylint: disable=too-many-function-args + async def listcoffees(self, ctx) -> None: + """ + List Available Coffees + Args: + ctx (Any) + Returns: + None + """ + coffees: str = "" + try: + for coffee in self.util.COFFEES: + coffees += f"**- {coffee}**\n" + embed: discord.Embed = discord.Embed(title="Available Coffees", + description=coffees.strip()) + return await ctx.respond(embed=embed) + except Exception as e: + traceback.print_exc() + return await ctx.respond(f"Error: {str(e)}") + + @bridge.bridge_command() + @is_spamchan_or_drugs() # pylint: disable=too-many-function-args + async def listshoves(self, ctx) -> None: + """ + List Available Fates for shove command + Args: + ctx (Any) + Returns: + None + """ + fates: str = "" + try: + for fate in self.FATES: + fates += f"**- {fate}**\n" + embed: discord.Embed = discord.Embed(title="Available Fates (for .shove)", + description=fates.strip()) + return await ctx.respond(embed=embed) + except Exception as e: + traceback.print_exc() + return await ctx.respond(f"Error: {str(e)}") + + + @bridge.bridge_command() + async def xmas(self, ctx) -> None: + """ + Countdown til xmas! + Args: + ctx (Any) + Returns: + None + """ + try: + emojis: dict = { + '0': '0️⃣', + '1': '1️⃣', + '2': '2️⃣', + '3': '3️⃣', + '4': '4️⃣', + '5': '5️⃣', + '6': '6️⃣', + '7': '7️⃣', + '8': '8️⃣', + '9': '9️⃣', + } + with ctx.channel.typing(): + (days, hours, minutes, seconds, ms, us) = self.util.get_days_to_xmas() # pylint: disable=unused-variable + now: datetime.datetime = datetime.datetime.now() + if now.month == 12 and now.day == 25: + return await ctx.respond("# IT IS CHRISTMAS!!!!!!!!\n-# keep the change, you filthy animal") + if days > 200 or days < 0: + return await ctx.respond("We ain't fuckin talkin bout that yet") + xmas_trans: dict = str.maketrans(emojis) + try: + await ctx.message.add_reaction(emoji="🎄") + except Exception as e: + logging.debug("Failed to add xmas reaction: %s", + str(e)) + await ctx.respond(f"Only {days} days, {hours} hours, {minutes} minutes,\ + {seconds} seconds and {ms} ms left! (UTC)".translate(xmas_trans)) + except Exception as e: + traceback.print_exc() + return await ctx.respond(f"Error: {str(e)}") + + @bridge.bridge_command() + async def randfact(self, ctx) -> None: + """ + Get a random (useless) fact! + Args: + ctx (Any) + Returns: + None + """ + try: + with ctx.channel.typing(): + fact: str = await self.util.get_random_fact() + return await ctx.respond(discord.utils.escape_markdown(fact.strip())) + except Exception as e: + traceback.print_exc() + return await ctx.respond(f"Error: {str(e)}") + + + @bridge.bridge_command() + async def insult(self, ctx, *, + recipient: Optional[str] = None) -> None: + """ + Insult Someone (or yourself) + Args: + ctx (Any) + recipient (Optional[str]) + Returns: + None + """ + try: + authorDisplay: str = ctx.author.display_name if not(ctx.author.display_name is None)\ + else ctx.message.author.display_name + + if recipient is None: + recipient: str = authorDisplay.strip() + else: + if discord.utils.raw_mentions(recipient): + # There are mentions + recipient_id: int = discord.utils.raw_mentions(recipient)[0] # First mention + recipient: str = self.bot.get_guild(ctx.guild.id)\ + .get_member(recipient_id).display_name + else: + recipient: str = discord.utils.escape_mentions(recipient.strip()) + with ctx.channel.typing(): + insult: str = await self.util.get_insult(recipient) + if insult: + return await ctx.respond(insult) + return await ctx.respond("Insult failed :(") + except Exception as e: + traceback.print_exc() + return await ctx.respond(f"Insult failed :(\nError: {str(e)}") + + @bridge.bridge_command(aliases=['whiskey']) + async def whisky(self, ctx, *, + recipient: Optional[str] = None) -> None: + """ + Get a whisky for yourself or a friend! + Args: + ctx (Any) + recipient (Optional[str]) + Returns: + None + """ + try: + if not recipient: + recipient: str = ctx.author.display_name + else: + if discord.utils.raw_mentions(recipient): + # There are mentions + recipient_id: int = discord.utils.raw_mentions(recipient)[0] # First mention + recipient: str = self.bot.get_guild(ctx.guild.id)\ + .get_member(recipient_id).display_name + else: + recipient: str = discord.utils.escape_mentions(recipient.strip()) + + (choice_name, choice_category, choice_description) = await self.util.get_whisky() + embed: discord.Embed = discord.Embed(title=f"Whisky for {recipient}: {choice_name}", + description=choice_description.strip()) + embed.add_field(name="Category", value=choice_category, inline=True) + embed.set_footer(text=f"Cheers, {recipient}!") + await self.util.increment_counter("whiskeys") + return await ctx.respond(embed=embed) + except Exception as e: + traceback.print_exc() + return await ctx.respond(f"Error: {str(e)}") + + + @bridge.bridge_command() + async def drink(self, ctx, *, + recipient: Optional[str] = None) -> None: + """ + Get a cocktail for yourself or a friend! + Args: + ctx (Any) + recipient (Optional[str]) + Returns: + None + """ + try: + if not recipient: + recipient: str = ctx.author.display_name + else: + if discord.utils.raw_mentions(recipient): + # There are mentions + recipient_id: int = discord.utils.raw_mentions(recipient)[0] # First mention + recipient: str = self.bot.get_guild(ctx.guild.id)\ + .get_member(recipient_id).display_name + else: + recipient: str = discord.utils.escape_mentions(recipient.strip()) + + (choice_name, choice_ingredients) = await self.util.get_drink() + await ctx.respond(f"*is mixing up **{choice_name}** for {recipient.strip()}*") + embed: discord.Embed = discord.Embed(title=f"Cocktail for {recipient}", + description=choice_name) + embed.add_field(name="Ingredients", + value=discord.utils.escape_markdown(choice_ingredients), + inline=True) + embed.set_footer(text=f"Cheers, {recipient}!") + await self.util.increment_counter("mixed_drinks") + return await ctx.respond(embed=embed) + except Exception as e: + traceback.print_exc() + return await ctx.respond(f"Failed: {str(e)}") + + @bridge.bridge_command() + async def spray(self, ctx, *, + recipient: Optional[str] = None) -> None: + """ + Spray someone with water! + Args: + ctx (Any) + recipient (Optional[str]) + Returns: + None + """ + authorDisplay: str = ctx.author.display_name if not(ctx.author.display_name is None)\ + else ctx.message.author.display_name + + if recipient is None: + recipient: str = authorDisplay.strip() + recipient_normal: str = ctx.author.mention + else: + recipient_normal: str = recipient + if discord.utils.raw_mentions(recipient): + # There are mentions + recipient_id: int = discord.utils.raw_mentions(recipient)[0] # First mention + recipient: str = self.bot.get_guild(ctx.guild.id)\ + .get_member(recipient_id).display_name + recipient_normal: str = self.bot.get_guild(ctx.guild.id)\ + .get_member(recipient_id).mention + else: + recipient: str = discord.utils.escape_mentions(recipient.strip()) + try: + await ctx.respond(f"*sprays **{recipient_normal}** with water*") + await self.util.increment_counter("water_sprays") + except Exception as e: + traceback.print_exc() + return await ctx.respond(f"Failed: {str(e)}") + + @bridge.bridge_command() + async def barfbag(self, ctx, *, + recipient: Optional[str] = None) -> None: + """ + Pass someone (or yourself) a barf bag! + Args: + ctx (Any) + recipient (Optional[str]) + Returns: + None + """ + authorDisplay: str = ctx.author.display_name if not(ctx.author.display_name is None)\ + else ctx.message.author.display_name + + if recipient is None: + recipient: str = authorDisplay.strip() + recipient_normal: str = ctx.author.mention + else: + recipient_normal: str = recipient + if discord.utils.raw_mentions(recipient): + # There are mentions + recipient_id: int = discord.utils.raw_mentions(recipient)[0] # First mention + recipient: str = self.bot.get_guild(ctx.guild.id)\ + .get_member(recipient_id).display_name + recipient_normal: str = self.bot.get_guild(ctx.guild.id)\ + .get_member(recipient_id).mention + else: + recipient: str = discord.utils.escape_mentions(recipient.strip()) + try: + await ctx.respond(f"*passes **{recipient_normal}** a barf bag*") + await self.util.increment_counter("barf_bags") + except Exception as e: + traceback.print_exc() + return await ctx.respond(f"Failed: {str(e)}") + + @bridge.bridge_command() + async def tea(self, ctx, *, + recipient: Optional[str] = None) -> None: + """ + Tea! + Args: + recipient (Optional[str]) + Returns: + None + """ + tea: str = "a cup of tea" + + authorDisplay: str = ctx.author.display_name if not(ctx.author.display_name is None)\ + else ctx.message.author.display_name + + if recipient is None: + recipient: str = authorDisplay.strip() + recipient_normal: str = ctx.author.mention + else: + recipient_normal: str = recipient + if discord.utils.raw_mentions(recipient): + # There are mentions + recipient_id: int = discord.utils.raw_mentions(recipient)[0] # First mention + recipient: str = self.bot.get_guild(ctx.guild.id)\ + .get_member(recipient_id).display_name + recipient_normal: str = self.bot.get_guild(ctx.guild.id)\ + .get_member(recipient_id).mention + else: + recipient: str = discord.utils.escape_mentions(recipient.strip()) + try: + if recipient == "rhodes": + tea: str = "a cup of Harney & Sons Hot Cinnamon Spice Tea" + elif ctx.author.id == 992437729927376996 or recipient.lower() in ["kriegerin", + "traurigkeit", + "krieg", + "kriegs", + "cyberkrieg", + "ck"]: + tea: str = "a cup of earl grey, light and sweet" + response = await ctx.respond(f"*hands **{recipient_normal}** {tea}*") + await self.util.increment_counter("teas") + try: + return await response.add_reaction(emoji="🫖") + except Exception as e: + logging.debug("Failed to add tea reaction: %s", + str(e)) + except Exception as e: + traceback.print_exc() + return await ctx.respond(f"Failed: {str(e)}") + + @bridge.bridge_command() + @is_spamchan_or_drugs() # pylint: disable=too-many-function-args + async def cowsay(self, ctx, *, + message: str) -> None: + """ + Cowsay! + Args: + ctx (Any) + message (str) + Returns: + None + """ + try: + cowfile: str = random.choice(self.COWS).replace(".cow", "") + cow_said: str = cow_say(f'-f{cowfile}', message) + response: str = f"```{cow_said}```\n-# Chosen cow: {cowfile}" + return await ctx.respond(response) + except Exception as e: + traceback.print_exc() + return await ctx.respond(f"Failed: {str(e)}") + + + @bridge.bridge_command() + @is_spamchan_or_drugs() # pylint: disable=too-many-function-args + async def fortune(self, ctx, + cowfile: Optional[str] = None) -> None: + """ + Fortune | Cowsay! + Args: + cowfile (Optional[str]) + Returns: + None + """ + try: + if not cowfile: + cowfile: str = random.choice(self.COWS).replace(".cow", "") + + if not f'{cowfile}.cow' in self.COWS: + return await ctx.respond(f"Unknown cow {cowfile}, who dat?") + + fortune_said: str = str(fortune('-n1000', '-s')) + cow_said: str = cow_say(f'-f{cowfile}', fortune_said) + response: str = f"```{cow_said}```\n-# Chosen cow: {cowfile}" + return await ctx.respond(response) + except Exception as e: + traceback.print_exc() + return await ctx.respond(f"Failed: {str(e)}") + + @bridge.bridge_command() + @is_spamchan_or_drugs() # pylint: disable=too-many-function-args + async def listcows(self, ctx) -> None: + """ + List available .cow files (for cowsay) + Args: + ctx (Any) + Returns: + None + """ + cow_list: str = "" + try: + for cow in self.COWS: + cow: str = cow.replace(".cow", "") + cow_list += f"- **{cow}**\n" + + embed: discord.Embed = discord.Embed(title="List of .cows", + description=cow_list.strip()) + return await ctx.respond(embed=embed) + except Exception as e: + traceback.print_exc() + return await ctx.respond(f"Failed: {str(e)}") + + @bridge.bridge_command() + async def cyanide(self, ctx, *, + recipient: Optional[str] = None) -> None: + """ + Cyanide! + Args: + ctx (Any) + recipient (Optional[str]) + Returns: + None + """ + authorDisplay: str = ctx.author.display_name if not(ctx.author.display_name is None)\ + else ctx.message.author.display_name + + if recipient is None: + recipient: str = authorDisplay.strip() + recipient_normal: str = ctx.author.mention + else: + recipient_normal: str = recipient + if discord.utils.raw_mentions(recipient): + # There are mentions + recipient_id: int = discord.utils.raw_mentions(recipient)[0] # First mention + recipient: str = self.bot.get_guild(ctx.guild.id)\ + .get_member(recipient_id).display_name + recipient_normal: str = self.bot.get_guild(ctx.guild.id)\ + .get_member(recipient_id).mention + else: + recipient: str = discord.utils.escape_mentions(recipient.strip()) + try: + response = await ctx.respond(f"*doses **{recipient_normal}** with Zyklon-B*") + await self.util.increment_counter("cyanides") + try: + await response.add_reaction(emoji="☠️") + return await response.add_reaction(emoji="🇩🇪") + except Exception as e: + logging.debug("Failed to add cynaide reaction: %s", + str(e)) + except: + traceback.print_exc() + return await ctx.respond(f"Failed: {str(e)}") + + @bridge.bridge_command() + async def gravy(self, ctx, *, + recipient: Optional[str] = None) -> None: + """ + School gravy! (It's deadly) + Args: + ctx (Any) + recipient (Optional[str]) + Returns: + None + """ + authorDisplay: str = ctx.author.display_name if not(ctx.author.display_name is None)\ + else ctx.message.author.display_name + + if recipient is None: + recipient: str = authorDisplay.strip() + recipient_normal: str = ctx.author.mention + else: + recipient_normal: str = recipient + if discord.utils.raw_mentions(recipient): + # There are mentions + recipient_id: int = discord.utils.raw_mentions(recipient)[0] # First mention + recipient: str = self.bot.get_guild(ctx.guild.id)\ + .get_member(recipient_id).display_name + recipient_normal: str = self.bot.get_guild(ctx.guild.id)\ + .get_member(recipient_id).mention + else: + recipient: str = discord.utils.escape_mentions(recipient.strip()) + try: + response = await ctx.respond(f"*doses **{recipient_normal}** with school gravy*") + await self.util.increment_counter("gravies") + try: + await response.add_reaction(emoji="☠️") + return await response.add_reaction(emoji="🏫") + except Exception as e: + logging.debug("Failed to add gravy reaction: %s", + str(e)) + except Exception as e: + traceback.print_exc() + return await ctx.respond(f"Failed: {str(e)}") + + @bridge.bridge_command() + async def water(self, ctx, *, + recipient: Optional[str] = None) -> None: + """ + Cold water! Drink up. + Args: + ctx (Any) + recipient (Optional[str]) + Returns: + None + """ + authorDisplay: str = ctx.author.display_name if not(ctx.author.display_name is None)\ + else ctx.message.author.display_name + + if recipient is None: + recipient: str = authorDisplay.strip() + recipient_normal: str = ctx.author.mention + else: + recipient_normal: str = recipient + if discord.utils.raw_mentions(recipient): + # There are mentions + recipient_id: int = discord.utils.raw_mentions(recipient)[0] # First mention + recipient: str = self.bot.get_guild(ctx.guild.id)\ + .get_member(recipient_id).display_name + recipient_normal: str = self.bot.get_guild(ctx.guild.id)\ + .get_member(recipient_id).mention + else: + recipient: str = discord.utils.escape_mentions(recipient.strip()) + try: + response = await ctx.respond(f"*hands **{recipient_normal}** a cold glass of water*") + await self.util.increment_counter("waters") + try: + return await response.add_reaction(emoji="💧") + except Exception as e: + logging.debug("Failed to add water reaction: %s", + str(e)) + except Exception as e: + traceback.print_exc() + return await ctx.respond(f"Failed: {str(e)}") + + @bridge.bridge_command(aliases=['bully']) + async def shove(self, ctx, *, + recipient: Optional[str] = None) -> None: + """ + Shove someone! (Or yourself) + Args: + ctx (Any) + recipient (Optional[str]) + Returns: + None + """ + chosen_fate: str = random.choice(self.FATES) + + authorDisplay: str = ctx.author.display_name if not(ctx.author.display_name is None)\ + else ctx.message.author.display_name + + if recipient is None: + recipient: str = authorDisplay.strip() + recipient_normal: str = ctx.author.mention + else: + recipient_normal: str = recipient + if discord.utils.raw_mentions(recipient): + # There are mentions + recipient_id: int = discord.utils.raw_mentions(recipient)[0] # First mention + recipient: str = self.bot.get_guild(ctx.guild.id)\ + .get_member(recipient_id).display_name + recipient_normal: str = self.bot.get_guild(ctx.guild.id)\ + .get_member(recipient_id).mention + else: + recipient: str = discord.utils.escape_mentions(recipient.strip()) + try: + await ctx.respond(f"*shoves **{recipient_normal}** {chosen_fate}*") + await self.util.increment_counter("shoves") + except Exception as e: + traceback.print_exc() + return await ctx.respond(f"Failed: {str(e)}") + + @bridge.bridge_command() + async def coffee(self, ctx, *, + recipient: Optional[str] = None) -> None: + """ + Coffee! + Args: + ctx (Any) + recipient (Optional[str]) + Returns: + None + """ + authorDisplay: str = ctx.author.display_name if not(ctx.author.display_name is None)\ + else ctx.message.author.display_name + + if recipient is None: + recipient: str = authorDisplay.strip() + recipient_normal: str = ctx.author.mention + else: + recipient_normal: str = recipient + if discord.utils.raw_mentions(recipient): + # There are mentions + recipient_id: int = discord.utils.raw_mentions(recipient)[0] # First mention + recipient: str = self.bot.get_guild(ctx.guild.id)\ + .get_member(recipient_id).display_name + recipient_normal: str = self.bot.get_guild(ctx.guild.id)\ + .get_member(recipient_id).mention + else: + recipient: str = discord.utils.escape_mentions(recipient.strip()) + try: + chosen_coffee: str = self.util.get_coffee() + response = await ctx.respond(f"*hands **{recipient_normal}** {chosen_coffee}*") + await self.util.increment_counter("coffees") + try: + return await response.add_reaction(emoji="☕") + except Exception as e: + logging.debug("Failed to add coffee reaction: %s", + str(e)) + except Exception as e: + traceback.print_exc() + return await ctx.respond(f"Failed: {str(e)}") + + @bridge.bridge_command(aliases=['cookies']) + async def cookie(self, ctx, *, + recipient: Optional[str] = None) -> None: + """ + Cookies! + Args: + ctx (Any) + recipient (Optional[str]) + Returns: + None + """ + authorDisplay: str = ctx.author.display_name if not(ctx.author.display_name is None)\ + else ctx.message.author.display_name + + if recipient is None: + recipient: str = authorDisplay.strip() + else: + if discord.utils.raw_mentions(recipient): + # There are mentions + recipient_id: int = discord.utils.raw_mentions(recipient)[0] # First mention + recipient: str = self.bot.get_guild(ctx.guild.id)\ + .get_member(recipient_id).display_name + else: + recipient: str = discord.utils.escape_mentions(recipient.strip()) + try: + chosen_cookie: dict = await self.util.get_cookie() + embed: discord.Embed = discord.Embed(title=f"Cookie for {recipient}", + description=f"Have a {chosen_cookie.get('name')}", + colour=discord.Colour.orange(), + image=chosen_cookie.get('image_url')) + embed.add_field(name="Origin", + value=chosen_cookie.get('origin')) + await ctx.respond(embed=embed) + except Exception as e: + traceback.print_exc() + return await ctx.respond(f"Failed: {str(e)}") + + @bridge.bridge_command(aliases=['hashbrowns', 'potato']) + async def hashbrown(self, ctx, *, + recipient: Optional[str] = None) -> None: + """ + Hashbrowns! + Args: + ctx (Any) + recipient (Optional[str]) + """ + authorDisplay: str = ctx.author.display_name if not(ctx.author.display_name is None)\ + else ctx.message.author.display_name + + if recipient is None: + recipient: str = authorDisplay.strip() + recipient_normal: str = ctx.author.mention + else: + recipient_normal: str = recipient + if discord.utils.raw_mentions(recipient): + # There are mentions + recipient_id: int = discord.utils.raw_mentions(recipient)[0] # First mention + recipient: str = self.bot.get_guild(ctx.guild.id)\ + .get_member(recipient_id).display_name + recipient_normal: str = self.bot.get_guild(ctx.guild.id)\ + .get_member(recipient_id).mention + else: + recipient: str = discord.utils.escape_mentions(recipient.strip()) + try: + await ctx.respond(f"*hands **{recipient_normal}** 2 warm hashbrowns*") + await self.util.increment_counter("hashbrowns") + except Exception as e: + traceback.print_exc() + return await ctx.respond(f"Failed: {str(e)}") + + @bridge.bridge_command() + async def ritalini(self, ctx, *, + recipient: Optional[str] = None) -> None: + """ + Ritalini! + Args: + ctx (Any) + recipient (Optional[str]) + Returns: + None + """ + authorDisplay = ctx.author.display_name if not(ctx.author.display_name is None)\ + else ctx.message.author.display_name + + if recipient is None: + recipient: str = authorDisplay.strip() + recipient_normal: str = ctx.author.mention + else: + recipient_normal: str = recipient + if discord.utils.raw_mentions(recipient): + # There are mentions + recipient_id: int = discord.utils.raw_mentions(recipient)[0] # First mention + recipient: str = self.bot.get_guild(ctx.guild.id)\ + .get_member(recipient_id).display_name + recipient_normal: str = self.bot.get_guild(ctx.guild.id)\ + .get_member(recipient_id).mention + else: + recipient: str = discord.utils.escape_mentions(recipient.strip()) + try: + response = await ctx.respond(f"*serves **{recipient_normal}** a plate of ritalini* 😉") + await response.add_reaction(emoji="💊") + await response.add_reaction(emoji="🍝") + await self.util.increment_counter("ritalinis") + except Exception as e: + traceback.print_exc() + return await ctx.respond(f"Failed: {str(e)}") + + @bridge.bridge_command(aliases=['gc']) + async def grilledcheese(self, ctx, *, + recipient: Optional[str] = None) -> None: + """ + Grilled Cheese! + Args: + ctx (Any) + recipient (Optional[str]) + Returns: + None + """ + authorDisplay: str = ctx.author.display_name if not(ctx.author.display_name is None)\ + else ctx.message.author.display_name + + if recipient is None: + recipient: str = authorDisplay.strip() + recipient_normal: str = ctx.author.mention + else: + recipient_normal: str = recipient + if discord.utils.raw_mentions(recipient): + # There are mentions + recipient_id: int = discord.utils.raw_mentions(recipient)[0] # First mention + recipient: str = self.bot.get_guild(ctx.guild.id)\ + .get_member(recipient_id).display_name + recipient_normal: str = self.bot.get_guild(ctx.guild.id)\ + .get_member(recipient_id).mention + else: + recipient: str = discord.utils.escape_mentions(recipient.strip()) + try: + await ctx.respond(f"*hands **{recipient_normal}** a grilled cheese*") + await self.util.increment_counter("grilled_cheeses") + except Exception as e: + traceback.print_exc() + return await ctx.respond(f"Failed: {str(e)}") + + @bridge.bridge_command() + async def soup(self, ctx, *, + recipient: Optional[str] = None) -> None: + """ + Soup! + Args: + ctx (Any) + recipient (Optional[str]) + Returns: + None + """ + authorDisplay: str = ctx.author.display_name if not(ctx.author.display_name is None)\ + else ctx.message.author.display_name + + if recipient is None: + recipient: str = authorDisplay.strip() + recipient_normal: str = ctx.author.mention + else: + recipient_normal: str = recipient + if discord.utils.raw_mentions(recipient): + # There are mentions + recipient_id: int = discord.utils.raw_mentions(recipient)[0] # First mention + recipient: str = self.bot.get_guild(ctx.guild.id)\ + .get_member(recipient_id).display_name + recipient_normal: str = self.bot.get_guild(ctx.guild.id)\ + .get_member(recipient_id).mention + else: + recipient: str = discord.utils.escape_mentions(recipient.strip()) + try: + await ctx.respond(f"*hands **{recipient_normal}** a hot bowl of soup*") + await self.util.increment_counter("soups") + except Exception as e: + traceback.print_exc() + return await ctx.respond(f"Failed: {str(e)}") + + @bridge.bridge_command() + async def muffin(self, ctx, *, + recipient: Optional[str] = None) -> None: + """ + Muffins! + Args: + ctx (Any) + recipient (Optional[str]) + Returns: + None + """ + authorDisplay: str = ctx.author.display_name if not(ctx.author.display_name is None)\ + else ctx.message.author.display_name + + if recipient is None: + recipient: str = authorDisplay.strip() + recipient_normal: str = ctx.author.mention + else: + recipient_normal: str = recipient + if discord.utils.raw_mentions(recipient): + # There are mentions + recipient_id: int = discord.utils.raw_mentions(recipient)[0] # First mention + recipient: str = self.bot.get_guild(ctx.guild.id)\ + .get_member(recipient_id).display_name + recipient_normal: str = self.bot.get_guild(ctx.guild.id)\ + .get_member(recipient_id).mention + else: + recipient = discord.utils.escape_mentions(recipient.strip()) + try: + response = await ctx.respond(f"*hands **{recipient_normal}** a muffin*") + await response.add_reaction(emoji="<:muffin:1314233635586707456>") + await self.util.increment_counter("muffins") + except Exception as e: + traceback.print_exc() + return await ctx.respond(f"Failed: {str(e)}") + + @bridge.bridge_command() + async def bacon(self, ctx, *, + recipient: Optional[str] = None) -> None: + """ + Bacon! + Args: + ctx (Any) + recipient (Optional[str]) + Returns: + None + """ + authorDisplay: str = ctx.author.display_name if not(ctx.author.display_name is None)\ + else ctx.message.author.display_name + + if recipient is None: + recipient: str = authorDisplay.strip() + recipient_normal: str = ctx.author.mention + else: + recipient_normal: str = recipient + if discord.utils.raw_mentions(recipient): + # There are mentions + recipient_id: int = discord.utils.raw_mentions(recipient)[0] # First mention + recipient: str = self.bot.get_guild(ctx.guild.id)\ + .get_member(recipient_id).display_name + recipient_normal: str = self.bot.get_guild(ctx.guild.id)\ + .get_member(recipient_id).mention + else: + recipient: str = discord.utils.escape_mentions(recipient.strip()) + try: + response = await ctx.respond(f"*hands **{recipient_normal}** a side of bacon*") + await response.add_reaction(emoji="🥓") + await self.util.increment_counter("bacon_sides") + except Exception as e: + traceback.print_exc() + return await ctx.respond(f"Failed: {str(e)}") + + @bridge.bridge_command() + async def hang(self, ctx, *, + recipient: Optional[str] = None) -> None: + """ + Hang someone! + Args: + ctx (Any) + recipient (Optional[str]) + Returns: + None + """ + authorDisplay = ctx.author.display_name if not(ctx.author.display_name is None)\ + else ctx.message.author.display_name + + if recipient is None: + recipient: str = authorDisplay.strip() + recipient_normal: str = ctx.author.mention + else: + recipient_normal: str = recipient + if discord.utils.raw_mentions(recipient): + # There are mentions + recipient_id: int = discord.utils.raw_mentions(recipient)[0] # First mention + recipient: str = self.bot.get_guild(ctx.guild.id)\ + .get_member(recipient_id).display_name + recipient_normal: str = self.bot.get_guild(ctx.guild.id)\ + .get_member(recipient_id).mention + else: + recipient: str = discord.utils.escape_mentions(recipient.strip()) + try: + response = await ctx.respond(f"*sends **{recipient_normal}** to the Gallows to be hanged asynchronely*") + await self.util.increment_counter("hangings") + try: + return await response.add_reaction(emoji="☠️") + except Exception as e: + logging.debug("Failed to add hang reaction: %s", + str(e)) + except Exception as e: + await ctx.respond(f"Failed: {str(e)}") + traceback.print_exc() + return + + + @bridge.bridge_command() + async def touch(self, ctx, *, + recipient: Optional[str] = None) -> None: + """ + Touch someone! + Args: + ctx (Any) + recipient (Optional[str]) + Returns: + None + """ + no_self_touch: str = ", don't fucking touch yourself here. You disgust me." + + if recipient is None: + recipient_normal: str = ctx.author.mention + await ctx.respond(f"{recipient_normal}{no_self_touch}") + try: + await ctx.message.add_reaction(emoji="🤮") + except Exception as e: + logging.debug("Failed to add puke reactin for touch command: %s", + str(e)) + return await self.util.increment_counter("touch_denials") + else: + recipient_normal: str = recipient + if discord.utils.raw_mentions(recipient): + # There are mentions + recipient_id: int = discord.utils.raw_mentions(recipient)[0] # First mention + recipient: str = self.bot.get_guild(ctx.guild.id)\ + .get_member(recipient_id).display_name + recipient_normal: str = self.bot.get_guild(ctx.guild.id)\ + .get_member(recipient_id).mention + else: + recipient: str = discord.utils.escape_mentions(recipient.strip()) + try: + response = await ctx.respond(f"*touches **{recipient_normal}** for **{ctx.author.mention}** because they wouldn't touch them with a shitty stick!*") + await self.util.increment_counter("touches") + try: + return await response.add_reaction(emoji="👉") + except Exception as e: + logging.debug("Failed to add touch reaction: %s", + str(e)) + except Exception as e: + traceback.print_exc() + return await ctx.respond(f"Failed: {str(e)}") + + @bridge.bridge_command() + @is_spamchan_or_drugs() # pylint: disable=too-many-function-args + async def qajoke(self, ctx) -> None: + """ + Get a joke in Q/A Form! + Args: + ctx (Any) + Returns: + None + """ + try: + (question, answer) = await self.util.get_qajoke() + escaped_question = discord.utils.escape_markdown(question) + escasped_answer = discord.utils.escape_markdown(answer) + embed: discord.Embed = discord.Embed(title=escaped_question, + description=escasped_answer) + return await ctx.respond(embed=embed) + except Exception as e: + await ctx.respond(f"Error: {str(e)}") + + @bridge.bridge_command() + @is_spamchan_or_drugs() # pylint: disable=too-many-function-args + async def rjoke(self, ctx) -> None: + """ + Get a joke! (from r/jokes scrape) + Args: + ctx (Any) + Returns; + None + """ + try: + (title, body, score) = await self.util.get_rjoke() + escaped_title = discord.utils.escape_markdown(title) + escaped_body = discord.utils.escape_markdown(body) + embed: discord.Embed = discord.Embed(title=escaped_title, + description=escaped_body) + embed.add_field(name="Score", value=score) + return await ctx.respond(embed=embed) + except Exception as e: + traceback.print_exc() + return await ctx.respond(f"Error: {str(e)}") + + @bridge.bridge_command() + @is_spamchan_or_drugs() # pylint: disable=too-many-function-args + async def joint(self, ctx, *, + recipient: Optional[str] = None) -> None: + """ + Joints! + Args: + ctx (Any) + recipient (Optional[str]) + Returns: + None + """ + authorDisplay: str = ctx.author.display_name if not(ctx.author.display_name is None)\ + else ctx.message.author.display_name + + if recipient is None: + recipient: str = authorDisplay.strip() + recipient_normal: str = ctx.user.mention + else: + recipient_normal: str = recipient + if discord.utils.raw_mentions(recipient): + # There are mentions + recipient_id: int = discord.utils.raw_mentions(recipient)[0] # First mention + recipient: str = self.bot.get_guild(ctx.guild.id)\ + .get_member(recipient_id).display_name + recipient_normal: str = self.bot.get_guild(ctx.guild.id)\ + .get_member(recipient_id).mention + else: + recipient: str = discord.utils.escape_mentions(recipient.strip()) + try: + (choice_strain, choice_desc) = await self.util.get_strain() + escaped_description = discord.utils.escape_markdown(choice_desc.strip()) + await ctx.send_followup(f"*hands **{recipient_normal}** a joint rolled up with some **{choice_strain}***", username="Joint Granter") + embed: discord.Embed = discord.Embed(title=choice_strain, + description=f"*{escaped_description}*") + await self.util.increment_counter("joints") + return await ctx.send_followup(embed=embed, username="Joint Granter") + except Exception as e: + traceback.print_exc() + return await ctx.respond(f"Failed: {str(e)}") + + @bridge.bridge_command() + async def isitfriday(self, ctx) -> None: + """ + IS IT FRIDAY!? + Args: + ctx (Any) + Returns: + None + """ + try: + today: datetime.datetime = datetime.datetime.today() + today_weekday: int = today.weekday() + if int(today_weekday) == 4: + return await ctx.respond("# FUCK YEAH IT'S FRIDAY!") + else: + return await ctx.respond("# IT AINT FUCKIN FRIDAY! GOD DAMN IT!") + except Exception as e: + traceback.print_exc() + return await ctx.respond(f"Failed: {str(e)}") + + @bridge.bridge_command(aliases=['flyday']) + async def isitflyday(self, ctx) -> None: + """ + IS IT FLYDAY!? + Args: + ctx (Any) + Returns: + None + """ + try: + today: datetime.datetime = datetime.datetime.today() + today_weekday: int = today.weekday() + + if int(today_weekday) == 1: + return await ctx.respond("# FUCK YEAH IT'S FLYDAY!") + else: + return await ctx.respond("# IT AINT FUCKIN FLYDAY! GOD DAMN IT!\n-# Pony may turn into a fly anyway, never any guarantees") + except Exception as e: + traceback.print_exc() + return await ctx.respond(f"Failed: {str(e)}") + + @bridge.bridge_command() + async def ud(self, ctx, *, + term: str) -> None: + """ + Not sure what that term means? UD prolly knows. + Args: + ctx (Any) + term (str) + Returns: + None + """ + try: + with ctx.channel.typing(): + (ud_word, ud_def) = await self.util.get_ud_def(term=term) + term_encoded: str = urllib.parse.quote(ud_word, encoding="utf-8") + ud_link = f"https://urbandictionary.com/define.php?term={term_encoded}" + embed: discord.Embed = discord.Embed(title=ud_word.strip(), + description=f"{ud_def.strip()}\n{ud_link}") + return await ctx.respond(embed=embed) + except Exception as e: + traceback.print_exc() + return await ctx.respond(f"Error: {str(e)}") + + """ User Commands """ + + @commands.user_command(name="Give Joint") + @is_spamchan() # pylint: disable=too-many-function-args + async def joint_context_menu(self, ctx, member: discord.Member) -> None: + """ + Joint Context Menu + Args: + ctx (Any) + member (discord.Member) + Returns: + None + """ + (choice_strain, choice_desc) = await self.util.get_strain() + escaped_desc = discord.utils.escape_markdown(choice_desc.strip()) + await ctx.interaction.respond(f"*hands **<@{member.id}>** a joint rolled up with some **{choice_strain}***") + embed: discord.Embed = discord.Embed(title=choice_strain, + description=f"*{escaped_desc}*") + await self.util.increment_counter("joints") + return await ctx.send(embed=embed) + + @commands.user_command(name="Give Coffee") + async def coffee_context_menu(self, ctx, member: discord.Member) -> None: + """ + Coffee Context Menu + Args: + ctx (Any) + member (discord.Member) + Returns: + None + """ + chosen_coffee = self.util.get_coffee() + response = await ctx.interaction.respond(f"*hands <@{member.id}> {chosen_coffee}*") + await self.util.increment_counter("coffees") + try: + await response.add_reaction(emoji="☕") + except: + pass + + def cog_unload(self) -> None: + """Run on Cog Unload""" + try: + self.randstat_loop.cancel() + except Exception as e: + logging.debug("Failed to cancel randstat loop: %s", str(e)) + +def setup(bot) -> None: + """Run on Cog Load""" + bot.add_cog(Misc(bot)) \ No newline at end of file diff --git a/cogs/misc_util.py b/cogs/misc_util.py new file mode 100644 index 0000000..f3346c4 --- /dev/null +++ b/cogs/misc_util.py @@ -0,0 +1,388 @@ +#!/usr/bin/env python3.12 + +import os +import logging +import traceback +import random +import datetime +import pytz +from typing import Optional, LiteralString +import regex +import aiosqlite as sqlite3 +from aiohttp import ClientSession, ClientTimeout +from bohancompliment import ComplimentGenerator +from discord import Embed + +class Util: + """Misc Utility""" + def __init__(self) -> None: + self.URL_URBANDICTIONARY: str = "http://api.urbandictionary.com/v0/define" + self.URL_INSULTAPI: str = "https://insult.mattbas.org/api/insult" + self.COMPLIMENT_GENERATOR = ComplimentGenerator() + self.dbs: dict[str|LiteralString] = { + 'whisky': os.path.join("/usr/local/share", + "sqlite_dbs", "whiskey.db"), + 'drinks': os.path.join("/usr/local/share", + "sqlite_dbs", "cocktails.db"), + 'strains': os.path.join("/usr/local/share", + "sqlite_dbs", "strains.db"), + 'qajoke': os.path.join("/usr/local/share", + "sqlite_dbs", "qajoke.db"), + 'rjokes': os.path.join("/usr/local/share", + "sqlite_dbs", "rjokes.db"), + 'randmsg': os.path.join("/usr/local/share", + "sqlite_dbs", "randmsg.db"), + 'stats': os.path.join("/usr/local/share", + "sqlite_dbs", "havoc_stats.db"), + 'cookies': os.path.join("/usr/local/share", + "sqlite_dbs", "cookies.db"), + } + self.COFFEES: list = ['a cup of french-pressed coffee', 'a cup of cold brew', 'a cup of flash brew', + 'a cup of Turkish coffee', 'a cup of Moka', 'an espresso', + 'a cup of Nescafe coffee', + 'an iced coffee', 'a Frappé', 'a freddo cappuccino', + 'a cup of Chock full o\'Nuts', 'a cup of Folgers', 'a cup of Lavazza', + 'a cup of Maxwell House', 'a cup of Moccona', 'a cup of Mr. Brown Coffee', + 'a cup of affogato al caffè', + 'a cup of Caffè Medici', 'a cup of Café Touba', + 'a double-double', 'an indian filter coffee', 'a cup of pocillo', + 'a cup of caffè americano', 'a cup of caffè lungo', 'a latte', 'a manilo', + 'a flat white', 'a cup of café cubano', 'a cup of caffè crema', + 'a cup of cafe zorro', 'an espresso roberto', 'an espresso romano', + 'an espresso sara', 'a guillermo', 'a ristretto', 'a cup of melya', + 'a cup of caffè marocchino', 'a cup of café miel', 'a cup of café de olla', + 'a Mazagran', 'a Palazzo', 'an ice shot'] + self.LAST_5_COFFEES: list = [] + + + def tdTuple(self, td:datetime.timedelta) -> tuple: + """ + Create TimeDelta Tuple + Args: + td (datetime.timedelta) + Returns: + tuple + """ + def _t(t, n): + if t < n: + return (t, 0) + v = t//n + return (t - (v * n), v) + (s, h) = _t(td.seconds, 3600) + (s, m) = _t(s, 60) + (mics, mils) = _t(td.microseconds, 1000) + return (td.days, h, m, s, mics, mils) + + def sqlite_dict_factory(self, cursor: sqlite3.Cursor, row: sqlite3.Row) -> dict: + """ + SQLite Dict Factory for Rows Returned + Args: + cursor (sqlite3.Row) + row (sqlite3.Row) + Returns: + dict + """ + fields = [column[0] for column in cursor.description] + return { key: value for key, value in zip(fields, row) } + + + async def get_counter(self, counter: Optional[str] = None) -> dict: + """ + Get Counter + Args: + counter (Optional[str]) + Returns: + dict + """ + async with sqlite3.connect(self.dbs.get('stats'), + timeout=3) as db_conn: + db_conn.row_factory = self.sqlite_dict_factory + query: str = "SELECT ? FROM stats LIMIT 1" + if not counter: + query: str = "SELECT * FROM stats LIMIT 1" + async with await db_conn.execute(query, (counter,) if counter else None) as db_cursor: + result: dict = await db_cursor.fetchone() + return result + + async def get_stats_embed(self) -> Embed: + """ + Get Stats Embed + Returns: + Embed + """ + counters: dict = await self.get_counter() + embed: Embed = Embed(title="Stats") + counter_message: str = "" + counters_sorted: dict = dict(sorted(counters.items(), + key=lambda item: item[1], reverse=True)) + for counter, value in counters_sorted.items(): + counter: str = regex.sub(r'_', ' ', + counter.strip()).title() + counter_message += f"- {value} {counter}\n" + embed.description = counter_message.strip() + return embed + + async def increment_counter(self, counter: str) -> bool: + """ + Increment Counter + Args: + counter (str) + Returns: + bool + """ + async with sqlite3.connect(self.dbs.get('stats'), + timeout=3) as db_conn: + async with await db_conn.execute(f"UPDATE stats SET {counter} = {counter} + 1") as db_cursor: + if db_cursor.rowcount < 0: + logging.critical("[karma::increment_counter] Fail! %s", db_cursor.rowcount) + return False + await db_conn.commit() + return True + + async def get_ud_def(self, term: Optional[str] = None) -> tuple[str, str]: + """ + Get Definition from UD + Args: + term (Optional[str]) + Returns: + tuple[str, str] + """ + try: + async with ClientSession() as session: + async with await session.get(self.URL_URBANDICTIONARY, + params={ + "term": term, + }, + headers = { + 'content-type': 'application/json; charset=utf-8', + }, timeout=ClientTimeout(connect=5, sock_read=5)) as request: + logging.debug("UD returned: %s", + await request.text()) + data: dict = await request.json() + if "list" in data: + definitions: list[dict] = data["list"] + if definitions: + definition: dict = definitions[0] + definition_word: str = definition.get("word", "N/A") + definition_text: str = regex.sub(r'(\r|\n|\r\n)', ' ', definition["definition"].strip()) + return (definition_word, definition_text) # Tuple: Returned word, returned definition + else: + return (term, "Not found!") + else: + return (term, "Error retrieving data from Urban Dictionary") + except Exception as e: + traceback.print_exc() + return (term, f"ERR: {str(e)}") + + async def get_insult(self, recipient: str) -> str: + """ + Get Insult + Args: + recipient (str) + Returns: + str + """ + async with ClientSession() as session: + async with await session.get(f"{self.URL_INSULTAPI}?who={recipient}") as request: + request.raise_for_status() + return await request.text() + + + async def get_compliment(self, subject: str, + language: Optional[str] = None) -> str: + """ + Get Compliment + Args: + subject (str) + language (Optional[str]) + Returns: + str + """ + if not language: + return self.COMPLIMENT_GENERATOR.compliment(subject) + return self.COMPLIMENT_GENERATOR.compliment_in_language(subject, language) + + async def get_whisky(self) -> tuple: + """ + Get Whisky + Returns: + tuple + """ + whisky_db: str|LiteralString = self.dbs.get('whisky') + db_conn = await sqlite3.connect(database=whisky_db, timeout=2) + db_query: str = "SELECT name, category, description FROM whiskeys ORDER BY random() LIMIT 1" + db_cursor: sqlite3.Cursor = await db_conn.execute(db_query) + db_result: tuple = await db_cursor.fetchone() + + (name, category, description) = db_result + name: str = regex.sub(r'(^\p{White_Space}|\r|\n)', '', + regex.sub(r'\p{White_Space}{2,}', ' ', + name.strip())) + category: str = regex.sub(r'(^\p{White_Space}|\r|\n)', '', + regex.sub(r'\p{White_Space}{2,}', ' ', + category.strip())) + description: str = regex.sub(r'(^\p{White_Space}|\r|\n)', '', + regex.sub(r'\p{White_Space}{2,}', ' ', + description.strip())) + return (name, category, description) + + async def get_drink(self) -> tuple: + """ + Get Drink + Returns: + tuple + """ + drinks_db: str|LiteralString = self.dbs.get('drinks') + db_conn = await sqlite3.connect(database=drinks_db, timeout=2) + db_query: str = "SELECT name, ingredients FROM cocktails ORDER BY random() LIMIT 1" + db_cursor: sqlite3.Cursor = await db_conn.execute(db_query) + db_result: tuple = await db_cursor.fetchone() + + (name, ingredients) = db_result + name = regex.sub(r'(^\p{White_Space}|\r|\n)', '', regex.sub(r'\p{White_Space}{2,}', ' ', name.strip())) + ingredients = regex.sub(r'(^\p{White_Space}|\r|\n)', '', regex.sub(r'\p{White_Space}{2,}', ' ', ingredients.strip())) + ingredients = regex.sub(r'\*', '\u2731', ingredients.strip()) + + return (name, ingredients) + + async def get_strain(self, strain: Optional[str] = None) -> tuple: + """ + Get Strain + Args: + strain (Optional[str]) + Returns: + tuple + """ + strains_db: str|LiteralString = self.dbs.get('strains') + db_conn = await sqlite3.connect(database=strains_db, timeout=2) + db_params: Optional[tuple] = None + if not strain: + db_query: str = "SELECT name, description FROM strains_w_desc ORDER BY random() LIMIT 1" + else: + db_query: str = "SELECT name, description FROM strains_w_desc WHERE name LIKE ?" + db_params: tuple = (f"%{strain.strip()}%",) + + db_cursor: sqlite3.Cursor = await db_conn.execute(db_query, db_params) + db_result: tuple = await db_cursor.fetchone() + + return db_result + + async def get_qajoke(self) -> tuple: + """ + Get QA Joke + Returns: + tuple + """ + qajoke_db: str|LiteralString = self.dbs.get('qajoke') + async with sqlite3.connect(database=qajoke_db, timeout=2) as db: + async with await db.execute('SELECT question, answer FROM jokes ORDER BY RANDOM() LIMIT 1') as cursor: + (question, answer) = await cursor.fetchone() + return (question, answer) + return None + + async def get_rjoke(self) -> tuple: + """ + Get r/joke Joke + Returns: + tuple + """ + rjokes_db: str|LiteralString = self.dbs.get('rjokes') + async with sqlite3.connect(database=rjokes_db, timeout=2) as db: + async with await db.execute('SELECT title, body, score FROM jokes WHERE score >= 100 ORDER BY RANDOM() LIMIT 1') as cursor: + (title, body, score) = await cursor.fetchone() + return (title, body, score) + return None + + + async def get_random_fact(self) -> str: + """ + Get Random Fact + Returns: + str + """ + try: + facts_api_url: str = "https://uselessfacts.jsph.pl/api/v2/facts/random" + facts_backup_url: str = "https://cnichols1734.pythonanywhere.com/facts/random" + async with ClientSession() as client: + try: + async with await client.get(facts_api_url, + timeout=ClientTimeout(connect=5, sock_read=5)) as request: + json: dict = await request.json() + fact: str = json.get('text') + if not fact: + raise BaseException("RandFact Src 1 Failed") + return fact + except: + async with await client.get(facts_backup_url, + timeout=ClientTimeout(connect=5, sock_read=5)) as request: + json: dict = await request.json() + fact: str = json.get('fact') + return fact + except Exception as e: + traceback.print_exc() + return f"Failed to get a random fact :( [{str(e)}]" + + async def get_cookie(self) -> dict: + """ + Get Cookie + Returns: + dict + """ + async with sqlite3.connect(self.dbs.get('cookies'), timeout=2) as db_conn: + async with await db_conn.execute("SELECT name, origin, image_url FROM cookies ORDER BY RANDOM() LIMIT 1") as db_cursor: + (name, origin, image_url) = await db_cursor.fetchone() + return { + 'name': name, + 'origin': origin, + 'image_url': image_url + } + + + def get_coffee(self) -> str: + """ + Get Coffee + Returns: + str + """ + try: + randomCoffee: str = random.choice(self.COFFEES) + if self.LAST_5_COFFEES and randomCoffee in self.LAST_5_COFFEES: + return self.get_coffee() # Recurse + if len(self.LAST_5_COFFEES) >= 5: + self.LAST_5_COFFEES.pop() # Store no more than 5 of the last served coffees + self.LAST_5_COFFEES.append(randomCoffee) + return randomCoffee + except: + traceback.print_exc() + return False + + def get_days_to_xmas(self) -> tuple[int|float]: + """ + Get # of Days until Xmas + Returns: + tuple[int|float] + """ + today: datetime = datetime.datetime.now(tz=pytz.UTC) + xmas: datetime = datetime.datetime( + year=today.year, + month=12, + day=25, + tzinfo=pytz.UTC, + ) + td: datetime.timedelta = (xmas - today) # pylint: disable=superfluous-parens + days, hours, minutes, seconds, us, ms = self.tdTuple(td) + + return (days, hours, minutes, seconds, ms, us) + + async def get_randmsg(self) -> str: + """ + Get Random Message from randmsg.db + Returns: + str + """ + randmsg_db = self.dbs.get('randmsg') + async with sqlite3.connect(database=randmsg_db, + timeout=2) as db_conn: + async with await db_conn.execute("SELECT msg FROM msgs ORDER BY RANDOM() LIMIT 1") as db_cursor: + (result,) = await db_cursor.fetchone() + return result \ No newline at end of file diff --git a/cogs/owner.py b/cogs/owner.py new file mode 100644 index 0000000..ebd3ae6 --- /dev/null +++ b/cogs/owner.py @@ -0,0 +1,285 @@ +#!/usr/bin/env python3.12 +# pylint: disable=bare-except, broad-exception-caught + +import io +import random +import asyncio +import traceback +from typing import Optional +import discord +import requests +from discord.ext import bridge, commands +import util + +class Owner(commands.Cog): + """Owner Cog for Havoc""" + def __init__(self, bot) -> None: + self.bot: discord.Bot = bot + self.former_roles_store: dict = {} + self._temperature: int = random.randrange(20, 30) + + @bridge.bridge_command(guild_ids=[1145182936002482196]) + async def temperature(self, ctx, temp: Optional[int|str] = None) -> None: + """ + Set Temperature + Args: + ctx (Any): Discord context + temperature (Optional[int|str]): New temperature + Returns: + None + """ + if not temp: + return await ctx.respond(f"The current temperature is: {self._temperature} °C") + + if not self.bot.is_owner(ctx.author): + return await ctx.respond("I am afraid I can't let you do that.") + try: + _temperature: int = int(temp) + except: + return await ctx.respond("Invalid input") + if _temperature < -15: + return await ctx.respond("Too cold! (-15°C minimum)") + elif _temperature > 35: + return await ctx.respond("Too hot! (35°C maximum)") + self._temperature: int = _temperature + return await ctx.respond(f"As per your request, I have adjusted the temperature to {_temperature} °C.") + + @bridge.bridge_command() + @commands.is_owner() + async def reload(self, ctx) -> None: + """ + Reload Cogs + Args: + ctx (Any): Discord context + Returns: + None + """ + self.bot.load_exts(False) + await ctx.respond("Reloaded!", ephemeral=True) + + + @bridge.bridge_command() + @commands.is_owner() + async def say(self, ctx, *, + parameters: str) -> None: + """ + Make me say something in a channel + Args: + ctx (Any): Discord context + parameters (str): Channel Message + Returns: + None + """ + parameters: list[str] = parameters.split(" ") + + if not len(parameters) > 1: + return await ctx.respond("**Error**: Incorrect command usage; required: ", ephemeral=True) + + channel: str = parameters[0] + channel_mentions: list[str] = discord.utils.raw_channel_mentions(channel) + if channel_mentions: + channel: str = str(channel_mentions[0]) + msg: str = " ".join(parameters[1:]) + sent = await util.discord_helpers.send_message(self.bot, channel=channel, + message=msg) + if not sent: + return await ctx.respond("**Failed.**", ephemeral=True) + return await ctx.respond("**Done.**", ephemeral=True) + + @bridge.bridge_command() + @commands.is_owner() + async def chgstatus(self, ctx, *, + status: Optional[str] = None) -> None: + """ + Change bots status + Args: + ctx (Any): Discord context + status (Optional[str]): The new status to set + Returns: + None + """ + if not status: + return await ctx.respond("ERR: No status provided to change to!", + ephemeral=True) + + await self.bot.change_presence(status=discord.Status.online, + activity=discord.CustomActivity(name=status.strip())) + await ctx.respond("Done!", ephemeral=True) + + @commands.message_command(name="Remove Messages Starting Here") + @commands.is_owner() + async def purge(self, ctx, message: discord.Message) -> None: + """ + Purge Messages + Args: + ctx (Any): Discord context + message (discord.Message): Discord message + Returns: + None + """ + try: + await ctx.channel.purge(after=message, + bulk=True, + limit=900000, + reason=f"Purge initiated by {ctx.author.display_name}") + await message.delete(reason=f"Purge initiated by {ctx.author.display_name}") + await ctx.respond("**Done!**") + # Wait 3 seconds, then delete interaction + await asyncio.sleep(3) + interaction = await ctx.interaction.original_response() + await interaction.delete() + except Exception as e: + traceback.print_exc() + return await ctx.respond(f"**ERR: {str(e)}**") + + @commands.message_command(name="Move to Memes") + @commands.is_owner() + async def movememe(self, ctx, message: discord.Message) -> None: + """ + Move to Memes + Args: + ctx (Any): Discord context + message (discord.Message): Discord message + Returns: + None + """ + try: + memes_channel: discord.TextChannel = ctx.guild.get_channel(1147229098544988261) + message_content: str = message.content + message_author: str = message.author.display_name + message_channel: str = message.channel.name + _file: Optional[discord.File] = None + if message.attachments: + for item in message.attachments: + if item.url and len(item.url) >= 20: + image: io.BytesIO = io.BytesIO(requests.get(item.url, stream=True, + timeout=20).raw.read()) + ext: str = item.url.split(".")[-1]\ + .split("?")[0].split("&")[0] + _file: discord.File = discord.File(image, filename=f'img.{ext}') + await memes_channel.send(f"*Performing bureaucratic duties (this didn't belong in #{message_channel})...*\n**{message_author}:** {message_content}", file=_file) + await message.delete() + await ctx.respond("OK!", ephemeral=True) + except: + traceback.print_exc() + return await ctx.respond("Failed! :(", ephemeral=True) + + @commands.message_command(name="Move to Drugs") + @commands.is_owner() + async def movedrugs(self, ctx, message: discord.Message) -> None: + """ + Move to Drugs + Args: + ctx (Any): Discord context + message (discord.Message): Discord message + Returns: + None + """ + try: + drugs_channel: discord.TextChannel = ctx.guild.get_channel(1172247451047034910) + message_content: str = message.content + message_author: str = message.author.display_name + message_channel: str = message.channel.name + _file: Optional[discord.File] = None + if message.attachments: + for item in message.attachments: + if item.url and len(item.url) >= 20: + image: io.BytesIO = io.BytesIO(requests.get(item.url, stream=True, + timeout=20).raw.read()) + ext: str = item.url.split(".")[-1]\ + .split("?")[0].split("&")[0] + _file: discord.File = discord.File(image, filename=f'img.{ext}') + await drugs_channel.send(f"*Performing bureaucratic duties (this didn't belong in #{message_channel})...\ + *\n**{message_author}:** {message_content}", file=_file) + await message.delete() + await ctx.respond("OK!", ephemeral=True) + except: + traceback.print_exc() + return await ctx.respond("Failed! :(", ephemeral=True) + + @commands.message_command(name="Move to fun-house") + @commands.is_owner() + async def movefunhouse(self, ctx, message: discord.Message) -> None: + """ + Move to fun-house + Args: + ctx (Any): Discord context + message (discord.Message): Discord message + Returns: + None + """ + try: + funhouse_channel: discord.TextChannel = ctx.guild.get_channel(1213160512364478607) + message_content: str = message.content + message_author: str = message.author.display_name + message_channel: str = message.channel.name + _file: Optional[discord.File] = None + if message.attachments: + for item in message.attachments: + if item.url and len(item.url) >= 20: + image: io.BytesIO = io.BytesIO(requests.get(item.url, stream=True, + timeout=20).raw.read()) + ext: str = item.url.split(".")[-1]\ + .split("?")[0].split("&")[0] + _file: discord.File = discord.File(image, filename=f'img.{ext}') + await funhouse_channel.send(f"*Performing bureaucratic duties (this didn't belong in #{message_channel})\ + ...*\n**{message_author}:** {message_content}") + await message.delete() + await ctx.respond("OK!", ephemeral=True) + except: + traceback.print_exc() + return await ctx.respond("Failed! :(", ephemeral=True) + + @commands.user_command(name="Einsperren!", guild_ids=[145182936002482196]) + @commands.is_owner() + async def einsperren(self, ctx, member: discord.Member) -> None: + """ + Einsperren! + Args: + ctx (Any): Discord context + member (discord.Member): Discord member + Returns: + None + """ + try: + if not ctx.guild.id == 1145182936002482196: + return # Not home server! + audit_reason: str = f"Einsperren von {ctx.user.display_name}" + member: discord.Member = ctx.guild.get_member(member.id) + member_display: str = member.display_name + einsperren_role: discord.Role = ctx.guild.get_role(1235415059300093973) if ctx.guild.id != 1145182936002482196\ + else ctx.guild.get_role(1235406301614309386) + member_roles: list[discord.Role] = [role for role in member.roles if not role.name == "@everyone"] + member_role_names: list[str] = [str(role.name).lower() for role in member_roles] + opers_chan: discord.TextChannel = ctx.guild.get_channel(1181416083287187546) + if not "einsperren" in member_role_names: + try: + if member.id in self.former_roles_store: + self.former_roles_store.pop(member.id) + self.former_roles_store[member.id] = member.roles + except: + pass # Safe to ignore + try: + await member.edit(roles=[einsperren_role], reason=audit_reason) + await ctx.respond(f"Gesendet {member_display} an einsperren.", ephemeral=True) + await opers_chan.send(f"@everyone: {ctx.user.display_name} gesendet {member_display} an einsperren.") + except: + traceback.print_exc() + return await ctx.respond("GOTTVERDAMMT!!", ephemeral=True) + self.former_roles_store[member.id] = member.roles + + if not member.id in self.former_roles_store: + await member.edit(roles=[]) # No roles + else: + former_roles: list[discord.Role] = self.former_roles_store.get(member.id) + await member.edit(roles=former_roles, reason=f"De-{audit_reason}") + + await ctx.respond(f"{member_display} wurde von der Einsperre befreit.", ephemeral=True) + await opers_chan.send(f"{member_display} wurde von {ctx.user.display_name} aus der Einsperre befreit.") + except Exception as e: + traceback.print_exc() + return await ctx.respond(f"ERR: {str(e)}", ephemeral=True) + +def setup(bot) -> None: + """Run on Cog Load""" + bot.add_cog(Owner(bot)) \ No newline at end of file diff --git a/cogs/quote.py b/cogs/quote.py new file mode 100644 index 0000000..a38ddba --- /dev/null +++ b/cogs/quote.py @@ -0,0 +1,320 @@ +#!/usr/bin/env python3.12 +# pylint: disable=bare-except, broad-exception-caught + +""" +Quote cog for Havoc +""" + +import traceback +import time +import os +import datetime +import asyncio +import discord +import aiosqlite as sqlite3 +from discord.ext import bridge, commands + +class DB: + """DB Utility for Quote Cog""" + def __init__(self, bot): + self.bot = bot + self.db_path = os.path.join("/", "usr", "local", "share", + "sqlite_dbs", "quotes.db") + self.hp_chanid = 1157529874936909934 + + + async def get_quote_count(self): + """Get Quote Count""" + async with sqlite3.connect(self.db_path, timeout=2) as db_conn: + async with await db_conn.execute("SELECT COUNT (*) FROM quotes") as db_cursor: + result = await db_cursor.fetchone() + return result[-1] + + async def remove_quote(self, quote_id: int): + """Remove Quote from DB""" + try: + async with sqlite3.connect(self.db_path, timeout=2) as db_conn: + async with await db_conn.execute("DELETE FROM quotes WHERE id = ?", (quote_id,)) as _: + await db_conn.commit() + return True + except: + await self.bot.get_channel(self.hp_chanid).send(traceback.format_exc()) + return False + + async def add_quote(self, message_id: int, channel_id: int, + quoted_member_id: int, + message_time: int, + quoter_friendly: str, + quoted_friendly: str, + channel_friendly: str, + message_content: str, + ): + """Add Quote to DB""" + params = ( + quoter_friendly, + int(time.time()), + quoted_friendly, + quoted_member_id, + channel_friendly, + channel_id, + message_id, + message_time, + quoter_friendly, + message_content, + ) + + try: + async with sqlite3.connect(self.db_path, timeout=2) as db_conn: + # pylint: disable=line-too-long + db_conn.row_factory = lambda c, r: dict([(col[0], r[idx]) for idx, col in enumerate(c.description)]) + async with await db_conn.execute("INSERT INTO quotes (added_by, added_at, quoted_user_display, quoted_user_memberid, quoted_channel_display, quoted_channel_id, quoted_message_id, quoted_message_time, added_by_friendly, quoted_message) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", + params) as _: # pylint: enable=line-too-long + await db_conn.commit() + return True + except: + return traceback.format_exc() + + async def fetch_quote(self, random: bool = False, quoteid: int = None, added_by: str = None, quoted_user: str = None, content: str = None): + """Fetch Quote from DB""" + try: + query_head = "SELECT id, added_by_friendly, added_at, quoted_user_display, quoted_channel_display, quoted_message_time, quoted_message FROM quotes" + query = "" + params = None + + if random: + query = f"{query_head} ORDER BY RANDOM() LIMIT 1" + elif quoteid: + query = f"{query_head} WHERE id = ? LIMIT 1" + params = (quoteid,) + elif added_by: + query = f"{query_head} WHERE added_by_friendly LIKE ? ORDER BY RANDOM() LIMIT 5" + params = (f"%{added_by}%",) + elif quoted_user: + query = f"{query_head} WHERE quoted_user_display LIKE ? ORDER BY RANDOM() LIMIT 5" + params = (f"%{quoted_user}%",) + elif content: + query = f"{query_head} WHERE quoted_message LIKE ? ORDER BY RANDOM() LIMIT 5" + params = (f"%{content}%",) + + async with sqlite3.connect(self.db_path, timeout=2) as db_conn: + db_conn.row_factory = lambda c, r: dict([(col[0], r[idx]) for idx, col in enumerate(c.description)]) + async with await db_conn.execute(query, params) as db_cursor: + results = await db_cursor.fetchall() + if not results: + return { + 'err': 'No results for query', + } + if random or quoteid: + chosen = results[-1] + return { + str(k): v for k,v in chosen.items() + } + else: + return [ + { str(k): v for k,v in _.items() } + for _ in results + ] + except: + return traceback.format_exc() + + +class Quote(commands.Cog): + """Quote Cog for Havoc""" + def __init__(self, bot): + self.bot = bot + self.db = DB(self.bot) + + def is_homeserver(): # pylint: disable=no-method-argument + """Check if channel/interaction is within homeserver""" + def predicate(ctx): + try: + return ctx.guild.id == 1145182936002482196 + except: + traceback.print_exc() + return False + return commands.check(predicate) + + @commands.message_command(name="Add Quote") + async def add_quote(self, ctx, message: discord.Message): + """Add A Quote""" + hp_chanid = 1157529874936909934 + try: + if message.author.bot: + return await ctx.respond("Quotes are for real users, not bots.", ephemeral=True) + quoter_friendly = ctx.author.display_name + quoted_message_id = message.id + quoted_channel_friendly = f'#{ctx.channel.name}' + quoted_channel_id = ctx.channel.id + message_content = message.content + message_author_friendly = message.author.display_name + message_author_id = message.author.id + message_time = int(message.created_at.timestamp()) + message_escaped = discord.utils.escape_mentions(discord.utils.escape_markdown(message_content)).strip() + + if len(message_escaped) < 3: + return await ctx.respond("**Error**: Message (text content) is not long enough to quote.", ephemeral=True) + + if len(message_escaped) > 512: + return await ctx.respond("**Error**: Message (text content) is too long to quote.", ephemeral=True) + + result = await self.db.add_quote(message_id=quoted_message_id, + channel_id=quoted_channel_id, + quoted_member_id=message_author_id, + message_time=message_time, + quoter_friendly=quoter_friendly, + quoted_friendly=message_author_friendly, + channel_friendly=quoted_channel_friendly, + message_content=message_content) + if not result: + return await ctx.respond("Failed!", ephemeral=True) + else: + return await ctx.respond("OK!", ephemeral=True) + except: + await self.bot.get_channel(hp_chanid).send(traceback.format_exc()) + + + @bridge.bridge_command(aliases=['rand']) + @is_homeserver() # pylint: disable=too-many-function-args + async def randquote(self, ctx): + """Get a random quote""" + try: + random_quote = await self.db.fetch_quote(random=True) + if random_quote.get('err'): + return await ctx.respond("Failed to get a quote") + + quote_id = random_quote.get('id') + quoted_friendly = random_quote.get('quoted_user_display', 'Unknown') + adder_friendly = random_quote.get('added_by_friendly', 'Unknown') + message_time = datetime.datetime.fromtimestamp(random_quote.get('quoted_message_time')) + message_channel = random_quote.get('quoted_channel_display') + quote_added_at = datetime.datetime.fromtimestamp(random_quote.get('added_at')) + quote_content = random_quote.get('quoted_message') + + embed = discord.Embed( + colour=discord.Colour.orange(), + title=f"Quote #{quote_id}", + ) + embed.description = f"**{quoted_friendly}:** {quote_content}" + embed.add_field(name="Original Message Time", value=message_time) + embed.add_field(name="Channel", value=message_channel) + embed.add_field(name="Quote ID", value=quote_id) + embed.footer = discord.EmbedFooter(text=f"Added by {adder_friendly} {quote_added_at}") + + return await ctx.respond(embed=embed) + except: + error = await ctx.respond(traceback.format_exc()) + await asyncio.sleep(10) + await error.delete() + + @bridge.bridge_command(aliases=['qg']) + @is_homeserver() # pylint: disable=too-many-function-args + async def quoteget(self, ctx, quoteid): + """Get a specific quote by ID""" + try: + if not str(quoteid).strip().isnumeric(): + return await ctx.respond("**Error**: Quote ID must be numeric.") + fetched_quote = await self.db.fetch_quote(quoteid=quoteid) + if fetched_quote.get('err'): + return await ctx.respond("**Error**: Quote not found") + + quote_id = fetched_quote.get('id') + quoted_friendly = fetched_quote.get('quoted_user_display', 'Unknown') + adder_friendly = fetched_quote.get('added_by_friendly', 'Unknown') + message_time = datetime.datetime.fromtimestamp(fetched_quote.get('quoted_message_time')) + message_channel = fetched_quote.get('quoted_channel_display') + quote_added_at = datetime.datetime.fromtimestamp(fetched_quote.get('added_at')) + quote_content = fetched_quote.get('quoted_message') + + embed = discord.Embed( + colour=discord.Colour.orange(), + title=f"Quote #{quote_id}", + ) + embed.description = f"**{quoted_friendly}:** {quote_content}" + embed.add_field(name="Original Message Time", value=message_time) + embed.add_field(name="Channel", value=message_channel) + embed.add_field(name="Quote ID", value=quote_id) + embed.footer = discord.EmbedFooter(text=f"Added by {adder_friendly} {quote_added_at}") + + return await ctx.respond(embed=embed) + except: + error = await ctx.respond(traceback.format_exc()) + await asyncio.sleep(10) + await error.delete() + + @bridge.bridge_command(aliases=['qs']) + @is_homeserver() # pylint: disable=too-many-function-args + async def quotesearch(self, ctx, *, content: str): + """Search for a quote (by content)""" + try: + found_quotes = await self.db.fetch_quote(content=content) + if isinstance(found_quotes, dict) and found_quotes.get('err'): + return await ctx.respond(f"Quote search failed: {found_quotes.get('err')}") + + embeds = [] + + for quote in found_quotes: + quote_id = quote.get('id') + quoted_friendly = quote.get('quoted_user_display', 'Unknown') + adder_friendly = quote.get('added_by_friendly', 'Unknown') + message_time = datetime.datetime.fromtimestamp(quote.get('quoted_message_time')) + message_channel = quote.get('quoted_channel_display') + quote_added_at = datetime.datetime.fromtimestamp(quote.get('added_at')) + quote_content = quote.get('quoted_message') + + # await ctx.respond(f"**{quoted_friendly}**: {quote}")ed_friendly = quote.get('quoted_user_display', 'Unknown') + adder_friendly = quote.get('added_by_friendly', 'Unknown') + message_time = datetime.datetime.fromtimestamp(quote.get('quoted_message_time')) + message_channel = quote.get('quoted_channel_display') + quote_added_at = datetime.datetime.fromtimestamp(quote.get('added_at')) + quote = quote.get('quoted_message') + + # await ctx.respond(f"**{quoted_friendly}**: {quote}") + embed = discord.Embed( + colour=discord.Colour.orange(), + title=f"Quote #{quote_id}", + ) + embed.description = f"**{quoted_friendly}:** {quote_content}" + embed.add_field(name="Original Message Time", value=message_time) + embed.add_field(name="Channel", value=message_channel) + embed.add_field(name="Quote ID", value=quote_id) + embed.footer = discord.EmbedFooter(text=f"Added by {adder_friendly} {quote_added_at}") + embeds.append(embed) + + return await ctx.respond(embeds=embeds) + except Exception as e: + await ctx.respond(f"Error: {type(e).__name__} - {str(e)}") + + @bridge.bridge_command(aliases=['nq']) + @is_homeserver() # pylint: disable=too-many-function-args + async def nquotes(self, ctx): + """Get # of quotes stored""" + try: + quote_count = await self.db.get_quote_count() + if not quote_count: + return await ctx.respond("**Error**: No quotes found!") + return await ctx.respond(f"I currently have **{quote_count}** quotes stored.") + + except Exception as e: + await ctx.respond(f"Error: {type(e).__name__} - {str(e)}") + + @bridge.bridge_command(aliases=['qr']) + @commands.is_owner() + @is_homeserver() # pylint: disable=too-many-function-args + async def quoteremove(self, ctx, quoteid): + """Remove a quote (by id) + Owner only""" + try: + if not str(quoteid).strip().isnumeric(): + return await ctx.respond("**Error**: Quote ID must be numeric.") + quoteid = int(quoteid) + remove_quote = await self.db.remove_quote(quoteid) + if not remove_quote: + return await ctx.respond("**Error**: Failed!", ephemeral=True) + return await ctx.respond("Removed!", ephemeral=True) + + except Exception as e: + await ctx.respond(f"Error: {type(e).__name__} - {str(e)}") + +def setup(bot): + """Run on Cog Load""" + bot.add_cog(Quote(bot)) diff --git a/cogs/radio.py b/cogs/radio.py new file mode 100644 index 0000000..8852c15 --- /dev/null +++ b/cogs/radio.py @@ -0,0 +1,108 @@ +#!/usr/bin/env python3.12 +# pylint: disable=bare-except, broad-exception-caught, invalid-name + +import logging +import traceback +from discord.ext import bridge, commands, tasks +import discord + +class Radio(commands.Cog): + """Radio Cog for Havoc""" + def __init__(self, bot: discord.Bot) -> None: + self.bot: discord.Bot = bot + self.channels: dict[tuple] = { + 'sfm': (1145182936002482196, 1221615558492029050), # Tuple: Guild Id, Chan Id + } + self.STREAM_URL: str = "https://relay.sfm.codey.lol/aces.ogg" + + try: + self.radio_state_loop.cancel() + except Exception as e: + logging.debug("Failed to cancel radio_state_loop: %s", + str(e)) + + @commands.Cog.listener() + async def on_ready(self) -> None: + """Run on Bot Ready""" + await self.radio_init() + + def is_radio_chan(): # pylint: disable=no-method-argument + """Check if channel is radio chan""" + def predicate(ctx): + try: + return ctx.channel.id == 1221615558492029050 + except: + traceback.print_exc() + return False + return commands.check(predicate) + + @bridge.bridge_command() + @commands.is_owner() + async def reinitradio(self, ctx) -> None: + """ + Reinitialize serious.FM + Args: + ctx (Any): Discord context + Returns: + None + """ + loop: discord.asyncio.AbstractEventLoop = self.bot.loop + loop.create_task(self.radio_init()) + await ctx.respond("Done!", ephemeral=True) + + async def radio_init(self) -> None: + """ + Init Radio + """ + try: + (radio_guild, radio_chan) = self.channels['sfm'] + channel: discord.TextChannel = self.bot.get_guild(radio_guild)\ + .get_channel(radio_chan) + if not self.bot.voice_clients: + await channel.connect() + try: + try: + self.radio_state_loop.cancel() + except Exception as e: + logging.debug("Failed to cancel radio_state_loop: %s", + str(e)) + self.radio_state_loop.start() + logging.info("radio_state_loop task started!") + except: + logging.critical("Could not start task...") + traceback.print_exc() + except: + traceback.print_exc() + return + + @tasks.loop(seconds=2.0) + async def radio_state_loop(self) -> None: + """Radio State Loop""" + try: + (radio_guild, radio_chan) = self.channels['sfm'] + try: + vc: discord.VoiceClient = self.bot.voice_clients[-1] + except: + logging.debug("No voice client, establishing new VC connection...") + channel = self.bot.get_guild(radio_guild)\ + .get_channel(radio_chan) + await channel.connect() + vc = self.bot.voice_clients[-1] + if not(vc.is_playing()) or vc.is_paused(): + logging.info("Detected VC not playing... playing!") + source = discord.FFmpegOpusAudio(self.STREAM_URL, + before_options="-timeout 3000000") + vc.play(source, after=lambda e: logging.info("Error: %s", e)\ + if e else None) + except: + traceback.print_exc() + + # pylint: enable=superfluous-parens + + def cog_unload(self) -> None: + """Run on Cog Unload""" + self.radio_state_loop.cancel() + +def setup(bot) -> None: + """Run on Cog Load""" + bot.add_cog(Radio(bot)) \ No newline at end of file diff --git a/cogs/sing.py b/cogs/sing.py new file mode 100644 index 0000000..4476a9f --- /dev/null +++ b/cogs/sing.py @@ -0,0 +1,184 @@ +#!/usr/bin/env python3.12 +# pylint: disable=bare-except, broad-exception-caught, global-statement, invalid-name + +import traceback +import logging +from typing import Optional, Pattern +import urllib +import discord +import regex +from util.sing_util import Utility +from discord.ext import bridge, commands + +BOT_CHANIDS = [] + +class Sing(commands.Cog): + """Sing Cog for Havoc""" + def __init__(self, bot): + self.bot: discord.Bot = bot + self.utility = Utility() + global BOT_CHANIDS + BOT_CHANIDS = self.bot.BOT_CHANIDS # Inherit + self.control_strip_regex: Pattern = regex.compile(r"\x0f|\x1f|\035|\002|\u2064|\x02|(\x03([0-9]{1,2}))|(\x03|\003)(?:\d{1,2}(?:,\d{1,2})?)?", + regex.UNICODE) + + def is_spamchan(): # pylint: disable=no-method-argument + """Check if channel is spam chan""" + def predicate(ctx): + try: + if not ctx.channel.id in BOT_CHANIDS: + logging.debug("%s not found in %s", ctx.channel.id, BOT_CHANIDS) + return ctx.channel.id in BOT_CHANIDS + except: + traceback.print_exc() + return False + return commands.check(predicate) + + @bridge.bridge_command(aliases=['sing']) + async def s(self, ctx, *, + song: Optional[str] = None) -> None: + """ + Search for lyrics, format is artist : song. Also reads activity. + Args: + ctx (Any): Discord context + song (Optional[str]): Song to search + Returns: + None + """ + try: + with ctx.channel.typing(): + interaction: bool = isinstance(ctx, + discord.ext.bridge.BridgeApplicationContext) + activity: Optional[discord.Activity] = None + if not song: + if not ctx.author.activities: + return + + for _activity in ctx.author.activities: + if _activity.type == discord.ActivityType.listening: + activity: discord.Activity = _activity + + if not activity: + return await ctx.respond("**Error**: No song specified, no activity found to read.") + + if interaction: + await ctx.respond("*Searching...*", ephemeral=True) # Must respond to interactions within 3 seconds, per Discord + + (search_artist, search_song, search_subsearch) = self.utility.parse_song_input(song, activity) + + # await ctx.respond(f"So, {search_song} by {search_artist}? Subsearch: {search_subsearch} I will try...") # Commented, useful for debugging + search_result: list[str] = await self.utility.lyric_search(search_artist, search_song, + search_subsearch) + + if len(search_result) == 1: + return await ctx.respond(search_result[0].strip()) + + (search_result_artist, search_result_song, search_result_src, + search_result_confidence, search_result_time_taken) = search_result[0] # First index is a tuple + search_result_wrapped: list[str] = search_result[1] # Second index is the wrapped lyrics + search_result_wrapped_short: list[str] = search_result[2] # Third is short wrapped lyrics + if not ctx.channel.id in BOT_CHANIDS: + search_result_wrapped: list[str] = search_result_wrapped_short # Replace with shortened lyrics for non spamchans + embeds: list[Optional[discord.Embed]] = [] + embed_url: str = f"[on codey.lol](https://codey.lol/#{urllib.parse.quote(search_artist)}/{urllib.parse.quote(search_song)})" + c: int = 0 + footer: str = "To be continued..." #Placeholder + for section in search_result_wrapped: + c+=1 + if c == len(search_result_wrapped): + footer: str = f"Found on: {search_result_src}" + section: str = self.control_strip_regex.sub('', section) + # if ctx.guild.id == 1145182936002482196: + # section = section.upper() + embed: discord.Embed = discord.Embed( + title=f"{search_result_song} by {search_result_artist}", + description=discord.utils.escape_markdown(section.replace("\n", "\n\n")) + ) + embed.add_field(name="Confidence", value=search_result_confidence, + inline=True) + embed.add_field(name="Time Taken", value=search_result_time_taken, + inline=True) + embed.add_field(name="Link", value=embed_url) + embed.set_footer(text=footer) + embeds.append(embed) + await ctx.respond(embed=embeds[0]) + for embed in embeds[1:]: + await ctx.send(embed=embed) + except Exception as e: + traceback.print_exc() + return await ctx.respond(f"ERR: {str(e)}") + + @commands.user_command(name="Sing") + async def sing_context_menu(self, ctx, member: discord.Member) -> None: + """ + Sing Context Menu Command + Args: + ctx (Any): Discord context + member (discord.Member): Discord member + Returns: + None + """ + try: + PODY_ID: int = 1172340700663255091 + IS_SPAMCHAN: bool = ctx.channel.id in BOT_CHANIDS + member_display = ctx.interaction.guild.get_member(member.id)\ + .display_name + if not(ctx.interaction.guild.get_member(member.id).activities)\ + and not member.id == PODY_ID: + return await ctx.respond(f"No activity detected to read for {member_display}.", ephemeral=True) + member_id: int = member.id #if not(member.id == PODY_ID) else 1234134345497837679 # Use Thomas for Pody! + activity: Optional[discord.Activity] = None + for _activity in ctx.interaction.guild.get_member(member_id).activities: + if _activity.type == discord.ActivityType.listening: + activity: discord.Activity = _activity + parsed: tuple|bool = self.utility.parse_song_input(song=None, + activity=activity) + if not parsed: + return await ctx.respond(f"Could not parse activity of {member_display}.", ephemeral=True) + if IS_SPAMCHAN: + await ctx.respond(f"***Reading activity of {member_display}...***") + + (search_artist, search_song, search_subsearch) = parsed + await ctx.respond("*Searching...*", ephemeral=True) # Must respond to interactions within 3 seconds, per Discord + search_result: list = await self.utility.lyric_search(search_artist, search_song, + search_subsearch) + + if len(search_result) == 1: + return await ctx.send(search_result[0].strip()) + + (search_result_artist, search_result_song, search_result_src, + search_result_confidence, search_result_time_taken) = search_result[0] # First index is a tuple + search_result_wrapped: list[str] = search_result[1] # Second index is the wrapped lyrics + search_result_wrapped_short: list[str] = search_result[2] # Third index is shortened lyrics + + if not IS_SPAMCHAN: + search_result_wrapped: list[str] = search_result_wrapped_short # Swap for shortened lyrics if not spam chan + + embeds: list[Optional[discord.Embed]] = [] + c: int = 0 + footer: str = "To be continued..." #Placeholder + for section in search_result_wrapped: + c+=1 + if c == len(search_result_wrapped): + footer: str = f"Found on: {search_result_src}" + # if ctx.guild.id == 1145182936002482196: + # section = section.upper() + embed: discord.Embed = discord.Embed( + title=f"{search_result_song} by {search_result_artist}", + description=discord.utils.escape_markdown(section.replace("\n", "\n\n")) + ) + embed.add_field(name="Confidence", value=search_result_confidence, inline=True) + embed.add_field(name="Time Taken", value=search_result_time_taken, inline=True) + embed.add_field(name="Link", value=f"[on codey.lol](https://codey.lol/#{urllib.parse.quote(search_result_artist)}/{urllib.parse.quote(search_result_song)})") + embed.set_footer(text=footer) + embeds.append(embed) + + for embed in embeds: + await ctx.send(embed=embed) + except Exception as e: + traceback.print_exc() + return await ctx.respond(f"ERR: {str(e)}") + +def setup(bot) -> None: + """Run on Cog Load""" + bot.add_cog(Sing(bot)) diff --git a/constructors.py b/constructors.py new file mode 100644 index 0000000..a935ce0 --- /dev/null +++ b/constructors.py @@ -0,0 +1,15 @@ +#!/usr/bin/env python3.12 + +""" +AI +""" +class AIException(Exception): + """AI Exception (generic)""" + pass + +""" +LoveHate +""" +class LoveHateException(Exception): + """Love Hate Exception (generic)""" + pass diff --git a/disc_havoc.py b/disc_havoc.py new file mode 100644 index 0000000..302a5c7 --- /dev/null +++ b/disc_havoc.py @@ -0,0 +1,97 @@ +#!/usr/bin/env python3.12 +# pylint: disable=bare-except, invalid-name, import-outside-toplevel + +import os +import logging +import importlib +import discord +import setproctitle +import hypercorn +import hypercorn.asyncio +from dotenv import load_dotenv +from discord.ext import bridge, commands +from termcolor import colored +import api + +logging.basicConfig(level=logging.INFO, + format='%(asctime)s %(message)s', + encoding='utf-8') +setproctitle.setproctitle('disc-havoc') + +owners = [1172340700663255091, 992437729927376996] +BOT_CHANIDS = [ + 1145182936442875997, + 1157535700774834236, + 1156710277266542624, + 1179232748385341530, + 1219638300654964807, + 1193632849740439665, + 1202288798315335831, + 1157529874936909934, + 1272333206066167959, + 1228740577068322839, + 1228740577068322841, + 1324142398741151784, + ] + +cogs_list = [ + 'misc', + 'owner', + 'sing', + 'meme', + 'ai', + 'karma', + 'lovehate', + 'quote', + 'radio', +] + +bot_activity = discord.CustomActivity(name="I made cookies!") + +load_dotenv() + +intents = discord.Intents.all() +intents.message_content = True +bot = bridge.Bot(command_prefix=".", intents=intents, + owner_ids=owners, activity=bot_activity, + help_command=commands.MinimalHelpCommand()) + + +@bot.event +async def on_ready(): + """Run on Bot Ready""" + logging.info("%s online!", bot.user) + +def load_exts(initialRun=True): + """Load Cogs/Extensions""" + load_method = bot.load_extension if initialRun else bot.reload_extension + + for cog in cogs_list: + logging.info("Loading: %s", cog) + load_method(f'cogs.{cog}') + + # asyncio.get_event_loop().create_task(bot.sync_commands()) + + importlib.reload(api) + from api import API # pylint: disable=unused-import + api_config = hypercorn.config.Config() + api_config.bind = "10.10.10.100:5992" + api_instance = api.API(bot) + try: + bot.fapi_task.cancel() + except: + pass + + logging.info("Starting FAPI Task") + + bot.fapi_task = bot.loop.create_task(hypercorn.asyncio.serve(api_instance.api_app, api_config)) + +def __init__(): + logging.info(colored(f"Log level: {logging.getLevelName(logging.root.level)}", "red", attrs=['reverse'])) + bot.BOT_CHANIDS = BOT_CHANIDS + bot.load_exts = load_exts + bot.load_exts() + bot.run(os.getenv('TOKEN')) + +if __name__ == "__main__": + __init__() \ No newline at end of file diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..5ea9346 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,13 @@ +aiosqlite==0.20.0 +beautifulsoup4==4.12.3 +edge_tts==6.1.12 +feedparser==6.0.11 +Flask==3.0.3 +nvdlib==0.7.7 +openai==1.54.3 +requests_async==0.6.2 +shazamio==0.7.0 +streamrip==2.0.5 +Unidecode==1.3.6 +Unidecode==1.3.8 +websockets==12.0 diff --git a/util/__init__.py b/util/__init__.py new file mode 100644 index 0000000..9c22854 --- /dev/null +++ b/util/__init__.py @@ -0,0 +1,6 @@ +#!/usr/bin/env python3.12 + +import importlib + +from . import discord_helpers +importlib.reload(discord_helpers) \ No newline at end of file diff --git a/util/discord_helpers.py b/util/discord_helpers.py new file mode 100644 index 0000000..4ad9fd9 --- /dev/null +++ b/util/discord_helpers.py @@ -0,0 +1,52 @@ +#!/usr/bin/env python3.12 +import re +import discord +from typing import Optional, Any + +""" +Discord Helper Methods +""" + +async def get_channel_by_name(bot: discord.Bot, channel: str, + guild: int | None = None) -> Optional[Any]: # Optional[Any] used as pycord channel types can be ambigious + """ + Get Channel by Name + Args: + bot (discord.Bot) + channel (str) + guild (int|None) + Returns: + Optional[Any] + """ + channel: str = re.sub(r'^#', '', channel.strip()) + if not guild: + return discord.utils.get(bot.get_all_channels(), + name=channel) + else: + channels: list = bot.get_guild(guild).channels + for _channel in channels: + if _channel.name.lower() == channel.lower().strip(): + return _channel + return + +async def send_message(bot: discord.Bot, channel: str, + message: str, guild: int | None = None) -> None: + """ + Send Message to the provided channel. If guild is provided, will limit to channels within that guild to ensure the correct + channel is selected. Useful in the event a channel exists in more than one guild that the bot resides in. + Args: + bot (discord.Bot) + channel (str) + message (str) + guild (int|None) + Returns: + None + """ + if channel.isnumeric(): + channel: int = int(channel) + _channel = bot.get_channel(channel) + else: + channel: str = re.sub(r'^#', '', channel.strip()) + _channel = await get_channel_by_name(bot=bot, + channel=channel, guild=guild) + await _channel.send(message) diff --git a/util/lovehate_db.py b/util/lovehate_db.py new file mode 100644 index 0000000..04726b0 --- /dev/null +++ b/util/lovehate_db.py @@ -0,0 +1,162 @@ +#!/usr/bin/env python3.12 +import os +import logging +from typing import Optional, LiteralString +import aiosqlite as sqlite3 +from constructors import LoveHateException + +class DB: + """LoveHate DB Utility Class""" + def __init__(self, bot) -> None: + self.db_path: str|LiteralString = os.path.join("/usr/local/share", + "sqlite_dbs", "lovehate.db") + + async def get_wholovehates(self, thing: str, loves: bool = False, + hates: bool = False) -> list[tuple]|bool: + """ + Get a list of users who have professed their love OR hatred for + + Args: + thing (str): The to check + loves (bool): Are we looking for loves? + hates (bool): ...or are we looking for hates? + Returns: + list[tuple] + """ + + query: str = "SELECT display_name FROM lovehate WHERE thing LIKE ? AND flag = ?" + params: tuple = tuple() + flag: Optional[int] = None + + if hates and loves: + raise LoveHateException("Both hates and loves may not be True") + elif hates: + flag: int = -1 + elif loves: + flag: int = 1 + elif not hates and not loves: + raise LoveHateException("Neither loves nor hates were requested") + + params: tuple = (thing, flag,) + async with sqlite3.connect(self.db_path, timeout=2) as db_conn: + async with await db_conn.execute(query, params) as db_cursor: + result: list[tuple] = await db_cursor.fetchall() + logging.debug("Result: %s", result) + if not result: + return False + return result + + async def get_lovehates(self, loves: bool = False, hates: bool = False, + user: str = None, thing: str = None) -> list[tuple]|bool: + """ + Get a list of either 1) what {user} loves/hates, or who loves/hates {thing}, depending on bools loves, hates + Args: + loves (bool): Are we looking for loves? + hates (bool): ...OR are we looking for hates? + user (Optional[str]): the user to query against + thing (Optional[str]): ... OR the thing to query against + Returns: + list[tuple]|bool + """ + + query: str = "" + params: tuple = tuple() + + if not user and not thing: + raise LoveHateException("Neither a or was specified to query against") + + flag: Optional[int] = None + + if hates and loves: + raise LoveHateException("Both hates and loves may not be True") + elif hates: + flag: int = -1 + elif loves: + flag: int = 1 + elif not hates and not loves: + raise LoveHateException("Neither loves nor hates were requested") + + if user: + query: str = "SELECT thing FROM lovehate WHERE display_name LIKE ? AND flag == ?" + params: tuple = (user, flag,) + elif thing: + query: str = "SELECT display_name FROM lovehate WHERE thing LIKE ? AND flag == ?" + params: tuple = (thing, flag,) + + async with sqlite3.connect(self.db_path, timeout=2) as db_conn: + async with await db_conn.execute(query, params) as db_cursor: + result = await db_cursor.fetchall() + if not result: + return False + return result + + + async def check_existence(self, user: str, thing: str) -> Optional[int]: + """ + Determine whether a user is opinionated on a + Args: + user (str): The user to check + thing (str): The thing to check if the user has an opinion on + Returns: + Optional[int] + """ + + params = (user, thing,) + + async with sqlite3.connect(self.db_path, timeout=2) as db_conn: + async with await db_conn.execute("SELECT id, flag FROM lovehate WHERE display_name LIKE ? AND thing LIKE ?", params) as db_cursor: + result = await db_cursor.fetchone() + if not result: + return None + (_, flag) = result + return flag + + async def update(self, user: str, thing: str, flag: int) -> str: + """ + Updates the lovehate database, and returns an appropriate response + Args: + user (str): The user to update + thing (str): The thing the user loves/hates/doesn't care about anymore + flag (int): int representation of love (1), hate (-1), and dontcare (0) + Returns: + str + """ + if not flag in range(-1, 2): + raise LoveHateException(f"Invalid flag {flag} specified, is this love (1), hate (-1), or dontcare? (0)") + + db_query: str = "" + params: tuple = (user, thing,) + + already_opinionated: bool = await self.check_existence(user, thing) + if already_opinionated: + if flag == 0: + db_query: str = "DELETE FROM lovehate WHERE display_name LIKE ? AND thing LIKE ?" + else: + loves_or_hates: str = "loves" + if already_opinionated == -1: + loves_or_hates: str = "hates" + raise LoveHateException(f"But {user} already {loves_or_hates} {thing}...") + else: + match flag: + case -1: + db_query: str = "INSERT INTO lovehate(display_name, flag, thing) VALUES(?, -1, ?)" + case 1: + db_query: str = "INSERT INTO lovehate(display_name, flag, thing) VALUES(?, 1, ?)" + case _: + raise LoveHateException("Unknown error, default case matched") + + + async with sqlite3.connect(self.db_path, timeout=2) as db_conn: + async with await db_conn.execute(db_query, params) as db_cursor: + await db_conn.commit() + if db_cursor.rowcount != 1: + raise LoveHateException(f"DB Error - RowCount: {db_cursor.rowcount} for INSERT query") + match flag: + case -1: + return f"We're done here, {user} hates {thing}." + case 0: + return f"We're done here, {user} no longer cares one way or the other about {thing}." + case 1: + return f"We're done here, {user} loves {thing}." + case _: + raise LoveHateException("Unknown error, default case matched [2]") \ No newline at end of file diff --git a/util/sing_util.py b/util/sing_util.py new file mode 100644 index 0000000..4876314 --- /dev/null +++ b/util/sing_util.py @@ -0,0 +1,139 @@ +#!/usr/bin/env python3.12 +import logging +import regex +import aiohttp +import textwrap +import traceback +from typing import Optional +from discord import Activity + +class Utility: + """Sing Utility""" + def __init__(self): + self.api_url: str = "http://127.0.0.1:52111/lyric/search" + self.api_src: str = "DISC-HAVOC" + + def parse_song_input(self, song: Optional[str] = None, + activity: Optional[Activity] = None) -> bool|tuple: + """Parse Song (Sing Command) Input + Args: + song (Optional[str]): Song to search + activity (Optional[discord.Activity]): Discord activity, used to attempt lookup if no song is provided + Returns: + bool|tuple + """ + logging.debug("Activity? %s", activity) + try: + if (not song or len(song) < 2) and not activity: + # pylint: disable=superfluous-parens + return False + # pylint: enable=superfluous-parens + if not song and activity: + match activity.name.lower(): + case "codey toons" | "cider" | "sonixd": + search_artist: str = " ".join(str(activity.state)\ + .strip().split(" ")[1:]) + search_artist: str = regex.sub(r"(\s{0,})(\[(spotify|tidal|sonixd|browser|yt music)])$", "", + search_artist.strip(), flags=regex.IGNORECASE) + search_song: str = str(activity.details) + song: str = f"{search_artist} : {search_song}" + case "tidal hi-fi": + search_artist: str = str(activity.state) + search_song: str = str(activity.details) + song: str = f"{search_artist} : {search_song}" + case "spotify": + search_artist: str = str(activity.title) + search_song: str = str(activity.artist) + song: str = f"{search_artist} : {search_song}" + case "serious.fm" | "cocks.fm" | "something": + if not activity.details: + song: str = str(activity.state) + else: + search_artist: str = str(activity.state) + search_song: str = str(activity.details) + song: str = f"{search_artist} : {search_song}" + case _: + return False # Unsupported activity detected + + search_split_by: str = ":" if not(song) or len(song.split(":")) > 1\ + else "-" # Support either : or - to separate artist/track + search_artist: str = song.split(search_split_by)[0].strip() + search_song: str = "".join(song.split(search_split_by)[1:]).strip() + search_subsearch: Optional[str] = None + if search_split_by == ":" and len(song.split(":")) > 2: # Support sub-search if : is used (per instructions) + search_song: str = song.split(search_split_by)[1].strip() # Reduce search_song to only the 2nd split of : [the rest is meant to be lyric text] + search_subsearch: str = "".join(song.split(search_split_by)[2:]) # Lyric text from split index 2 and beyond + return (search_artist, search_song, search_subsearch) + except: + traceback.print_exc() + return False + + async def lyric_search(self, artist: str, song: str, + sub: Optional[str] = None) -> list[str]: + """ + Lyric Search + Args: + artist (str): Artist to search + song (str): Song to search + sub (Optional[str]): Lyrics for subsearch + """ + try: + if not artist or not song: + return ["FAIL! Artist/Song not provided"] + + search_obj: dict = { + 'a': artist.strip(), + 's': song.strip(), + 'extra': True, + 'src': self.api_src, + } + + if len(song.strip()) < 1: + search_obj.pop('a') + search_obj.pop('s') + search_obj['t'] = artist.strip() # Parse failed, try title without sep + + if sub and len(sub) >= 2: + search_obj['sub'] = sub.strip() + + async with aiohttp.ClientSession() as session: + async with await session.post(self.api_url, + json=search_obj, + timeout=aiohttp.ClientTimeout(connect=5, sock_read=10)) as request: + request.raise_for_status() + response: dict = await request.json() + if response.get('err'): + return [f"ERR: {response.get('errorText')}"] + + out_lyrics = regex.sub(r'
', '\u200B\n', response.get('lyrics')) + response_obj: dict = { + 'artist': response.get('artist'), + 'song': response.get('song'), + 'lyrics': out_lyrics, + 'src': response.get('src'), + 'confidence': float(response.get('confidence')), + 'time': float(response.get('time')), + } + + lyrics = response_obj.get('lyrics') + response_obj['lyrics'] = textwrap.wrap(text=lyrics.strip(), + width=4000, drop_whitespace=False, + replace_whitespace=False, break_long_words=True, + break_on_hyphens=True, max_lines=8) + response_obj['lyrics_short'] = textwrap.wrap(text=lyrics.strip(), + width=750, drop_whitespace=False, + replace_whitespace=False, break_long_words=True, + break_on_hyphens=True, max_lines=1) + + return [ + ( + response_obj.get('artist'), response_obj.get('song'), response_obj.get('src'), + f"{int(response_obj.get('confidence'))}%", + f"{response_obj.get('time', -666.0):.4f}s", + ), + response_obj.get('lyrics'), + response_obj.get('lyrics_short'), + ] + except Exception as e: + traceback.print_exc() + return [f"Retrieval failed: {str(e)}"] \ No newline at end of file