such cleanup

This commit is contained in:
codey 2025-02-15 08:36:45 -05:00
parent d60828df07
commit 9d23438b13
12 changed files with 357 additions and 209 deletions

View File

@ -14,14 +14,14 @@ import regex
from regex import Pattern
from aiohttp import ClientSession, ClientTimeout
from discord.ext import bridge, commands, tasks
from disc_havoc import Havoc
class Util:
"""Karma Utility"""
def __init__(self, bot):
self.bot = bot
def __init__(self, bot: Havoc):
self.bot: Havoc = bot
self.api_key: str = constants.PRV_API_KEY
self.karma_endpoints_base_url: str = "https://api.codey.lol/karma/"
self.karma_retrieval_url: str = f"{self.karma_endpoints_base_url}get"
@ -146,9 +146,9 @@ class Util:
class Karma(commands.Cog):
"""Karma Cog for Havoc"""
def __init__(self, bot):
def __init__(self, bot: Havoc):
importlib.reload(constants)
self.bot: discord.Bot = bot
self.bot: Havoc = bot
self.util = Util(self.bot)
# self.karma_regex = regex.compile(r'(\w+)(\+\+|\-\-)')
self.karma_regex: Pattern = regex.compile(r'(\b\w+(?:\s+\w+)*)(\+\+($|\s)|\-\-($|\s))')

View File

@ -10,12 +10,12 @@ import aiosqlite as sqlite3
from discord.ext import bridge, commands
from util.lovehate_db import DB
from constructors import LoveHateException
from disc_havoc import Havoc
class LoveHate(commands.Cog):
"""LoveHate Cog for Havoc"""
def __init__(self, bot):
self.bot: discord.Bot = bot
def __init__(self, bot: Havoc) -> None:
self.bot: Havoc = bot
self.db = DB(self.bot)
def join_with_and(self, items: list) -> str:

View File

@ -6,12 +6,13 @@ import json
import io
import asyncio
import random
from typing import LiteralString, Optional
from typing import LiteralString, Optional, Any
import logging
import textwrap
import regex
import requests
import discord
from disc_havoc import Havoc
from aiohttp import ClientSession
from discord.ext import bridge, commands, tasks
from jesusmemes import JesusMemeGenerator
@ -56,6 +57,8 @@ class MemeView(discord.ui.View):
async def select_callback(self, select: discord.ui.Select,
interaction: discord.Interaction) -> None:
"""Meme Selection Callback"""
if not isinstance(select.values[0], str):
return
modal: discord.ui.Modal = MemeModal(meme=select.values[0], title="Meme Selected")
await interaction.response.send_modal(modal)
@ -63,7 +66,7 @@ class MemeModal(discord.ui.Modal):
"""Meme Creation discord.ui.Modal"""
def __init__(self, *args, meme: Optional[str] = None, **kwargs) -> None:
super().__init__(*args, **kwargs)
self.selected_meme: str = meme
self.selected_meme: Optional[str] = meme
self.meme_generator = JesusMemeGenerator()
self.TEXT_LIMIT: int = 80
@ -73,7 +76,14 @@ class MemeModal(discord.ui.Modal):
style=discord.InputTextStyle.singleline))
async def callback(self, interaction: discord.Interaction) -> None:
if not self.selected_meme: # No meme selected
return
selected_meme: str = self.selected_meme
if not self.children or len(self.children) < 2: # Invalid request
return
if not isinstance(self.children[0].value, str)\
or not isinstance(self.children[1].value, str): # Invalid request
return
meme_top_line: str = self.children[0].value.strip()
meme_bottom_line: str = self.children[1].value.strip()
if len(meme_top_line) > self.TEXT_LIMIT or len(meme_bottom_line) > self.TEXT_LIMIT:
@ -85,18 +95,18 @@ class MemeModal(discord.ui.Modal):
embed: discord.Embed = discord.Embed(title="Generated Meme")
embed.set_image(url=meme_link)
embed.add_field(name="Meme", value=self.selected_meme, inline=True)
embed.add_field(name="Meme", value=selected_meme, inline=True)
await interaction.response.send_message(embeds=[embed])
return
class Meme(commands.Cog):
"""Meme Cog for Havoc"""
def __init__(self, bot) -> None:
self.bot: discord.Bot = bot
def __init__(self, bot: Havoc) -> None:
self.bot: Havoc = bot
self.meme_choices: list = []
self.meme_counter: int = 0
self.THREADS: dict[dict[list]] = {
self.THREADS: dict[str, dict[int, list]] = {
# Format: Guild1: [ChanId : [Webhook, ThreadId], Guild2: [ChanId : [Webhook, ThreadId]
'comic_explosm': {
1298729744216359055: [constants.EXPLOSM_WEBHOOK, 1299165855493390367],
@ -120,7 +130,7 @@ class Meme(commands.Cog):
}
}
self.NO_THREAD_WEBHOOKS: dict[list] = {
self.NO_THREAD_WEBHOOKS: dict[str, list] = {
'theonion': [constants.ONION_WEBHOOK, constants.ONION_WEBHOOK2],
'thn': [constants.THN_WEBHOOK],
'memes': [constants.MEME_WEBHOOK1, constants.MEME_WEBHOOK2],
@ -132,7 +142,7 @@ class Meme(commands.Cog):
self.meme_stream_loop.start()
self.explosm_loop.start()
def is_spamchan() -> bool: # pylint: disable=no-method-argument
def is_spamchan() -> bool: # type: ignore
"""Check if channel is spamchan"""
def predicate(ctx):
try:
@ -142,7 +152,7 @@ class Meme(commands.Cog):
except:
traceback.print_exc()
return False
return commands.check(predicate)
return commands.check(predicate) # type: ignore
@ -163,39 +173,43 @@ class Meme(commands.Cog):
dino_grabber = dinog.DinosaurGrabber()
onion_grabber = oniong.OnionGrabber()
thn_grabber = thng.THNGrabber()
explosm_comics = xkcd_comics = smbc_comics = qc_comics = dino_comics\
= onions = thns = []
memes: list[tuple] = await meme_grabber.get()
explosm_comics: list[Optional[tuple]] = []
xkcd_comics: list[Optional[tuple]] = []
smbc_comics: list[Optional[tuple]] = []
dino_comics: list[Optional[tuple]] = []
onions: list[Optional[tuple]] = []
thns: list[Optional[tuple]] = []
memes: list[Optional[tuple]] = await meme_grabber.get()
try:
try:
explosm_comics: list[tuple] = await explosm_grabber.get()
explosm_comics = await explosm_grabber.get()
except:
pass
try:
xkcd_comics: list[tuple] = await xkcd_grabber.get()
xkcd_comics = await xkcd_grabber.get()
except:
pass
try:
smbc_comics: list[tuple] = await smbc_grabber.get()
smbc_comics = await smbc_grabber.get()
except:
pass
try:
qc_comics: list[tuple] = await qc_grabber.get()
qc_comics = await qc_grabber.get()
print(f"QC: {qc_comics}")
except:
pass
try:
dino_comics: list[tuple] = await dino_grabber.get()
dino_comics = await dino_grabber.get()
except Exception as e:
logging.debug("Dino failed: %s", str(e))
pass
try:
onions: list[tuple] = await onion_grabber.get()
onions = await onion_grabber.get()
except Exception as e:
logging.debug("Onion failed: %s", str(e))
pass
try:
thns: list[tuple] = await thn_grabber.get()
thns = await thn_grabber.get()
except Exception as e:
logging.debug("THNs failed: %s", str(e))
pass
@ -208,12 +222,14 @@ class Meme(commands.Cog):
if not only_comics:
try:
for meme in memes:
if not meme:
continue
(meme_id, meme_title, meme_url) = meme # pylint: disable=unused-variable
request = requests.get(meme_url, stream=True, timeout=(5, 30), headers=headers)
if not request.status_code == 200:
continue
meme_content: bytes = request.raw.read()
for meme_hook in self.NO_THREAD_WEBHOOKS.get('memes'):
for meme_hook in self.NO_THREAD_WEBHOOKS.get('memes', {}):
meme_image: io.BytesIO = io.BytesIO(meme_content)
ext: str = meme_url.split(".")[-1]\
.split("?")[0].split("&")[0]
@ -227,24 +243,27 @@ class Meme(commands.Cog):
pass
try:
for comic in explosm_comics:
if not comic:
continue
(comic_title, comic_url) = comic
comic_title: str = discord.utils.escape_markdown(comic_title)
comic_title = discord.utils.escape_markdown(comic_title)
comic_request = requests.get(comic_url, stream=True, timeout=(5, 20), headers=headers)
comic_request.raise_for_status()
comic_content: bytes = comic_request.raw.read()
ext: str = comic_url.split(".")[-1]\
ext = comic_url.split(".")[-1]\
.split("?")[0].split("&")[0]
async with ClientSession() as session:
for chanid, _hook in self.THREADS.get('comic_explosm').items():
for chanid, _hook in self.THREADS.get('comic_explosm', {}).items():
comic_image: io.BytesIO = io.BytesIO(comic_content)
channel: int = chanid
hook_uri: str = _hook[0]
thread_id: int = _hook[1]
webhook: discord.Webhook = discord.Webhook.from_url(hook_uri,
(hook_uri, thread_id) = _hook
webhook = discord.Webhook.from_url(hook_uri,
session=session)
thread: discord.Thread = self.bot.get_channel(channel)\
.get_thread(thread_id)
_channel: Any = self.bot.get_channel(channel)
if not _channel:
return
thread = _channel.get_thread(thread_id)
await webhook.send(f"**{comic_title}**", file=discord.File(comic_image, filename=f'img.{ext}'),
username="Cyanide & Happiness", thread=thread)
await asyncio.sleep(2)
@ -252,25 +271,28 @@ class Meme(commands.Cog):
pass
try:
for comic in xkcd_comics:
if not comic:
continue
(comic_title, comic_url) = comic
comic_title: str = discord.utils.escape_markdown(comic_title)
comic_title = discord.utils.escape_markdown(comic_title)
comic_request = requests.get(comic_url, stream=True, timeout=(5, 20), headers=headers)
comic_request.raise_for_status()
comic_content: bytes = comic_request.raw.read()
comic_image: io.BytesIO = io.BytesIO(comic_request.raw.read())
ext: str = comic_url.split(".")[-1]\
comic_content = comic_request.raw.read()
comic_image = io.BytesIO(comic_request.raw.read())
ext = comic_url.split(".")[-1]\
.split("?")[0].split("&")[0]
async with ClientSession() as session:
for chanid, _hook in self.THREADS.get('comic_xkcd').items():
comic_image: io.BytesIO = io.BytesIO(comic_content)
channel: int = chanid
hook_uri: str = _hook[0]
thread_id: int = _hook[1]
webhook: discord.Webhook = discord.Webhook.from_url(hook_uri,
for chanid, _hook in self.THREADS.get('comic_xkcd', {}).items():
comic_image = io.BytesIO(comic_content)
channel = chanid
(hook_uri, thread_id) = _hook
webhook = discord.Webhook.from_url(hook_uri,
session=session)
thread: discord.Thread = self.bot.get_channel(channel)\
.get_thread(thread_id)
_channel = self.bot.get_channel(channel)
if not _channel:
return
thread = _channel.get_thread(thread_id)
await webhook.send(f"**{comic_title}**", file=discord.File(comic_image, filename=f'img.{ext}'),
username="xkcd", thread=thread)
await asyncio.sleep(2)
@ -278,24 +300,27 @@ class Meme(commands.Cog):
pass
try:
for comic in smbc_comics:
if not comic:
continue
(comic_title, comic_url) = comic
comic_title: str = discord.utils.escape_markdown(comic_title)
comic_title = discord.utils.escape_markdown(comic_title)
comic_request = requests.get(comic_url, stream=True, timeout=(5, 20), headers=headers)
comic_request.raise_for_status()
comic_content: bytes = comic_request.raw.read()
ext: str = comic_url.split(".")[-1]\
comic_content = comic_request.raw.read()
ext = comic_url.split(".")[-1]\
.split("?")[0].split("&")[0]
async with ClientSession() as session:
for chanid, _hook in self.THREADS.get('comic_smbc').items():
comic_image: io.BytesIO = io.BytesIO(comic_content)
channel: int = chanid
hook_uri: str = _hook[0]
thread_id: int = _hook[1]
webhook: discord.Webhook = discord.Webhook.from_url(hook_uri,
for chanid, _hook in self.THREADS.get('comic_smbc', {}).items():
comic_image = io.BytesIO(comic_content)
channel = chanid
(hook_uri, thread_id) = _hook
webhook = discord.Webhook.from_url(hook_uri,
session=session)
thread = self.bot.get_channel(channel)\
.get_thread(thread_id)
_channel = self.bot.get_channel(channel)
if not _channel:
return
thread = _channel.get_thread(thread_id)
await webhook.send(f"**{comic_title}**", file=discord.File(comic_image, filename=f'img.{ext}'),
username="SMBC", thread=thread)
await asyncio.sleep(2)
@ -304,29 +329,32 @@ class Meme(commands.Cog):
try:
for comic in qc_comics:
logging.debug("Trying QC...")
if not comic:
continue
(comic_title, comic_url) = comic
comic_title: str = discord.utils.escape_markdown(comic_title)
comic_url: str = regex.sub(r'^http://ww\.', 'http://www.',
comic_title = discord.utils.escape_markdown(comic_title)
comic_url = regex.sub(r'^http://ww\.', 'http://www.',
comic_url)
comic_url: str = regex.sub(r'\.pmg$', '.png',
comic_url = regex.sub(r'\.pmg$', '.png',
comic_url)
comic_request = requests.get(comic_url, stream=True,
timeout=(5, 20), headers=headers)
comic_request.raise_for_status()
comic_content: bytes = comic_request.raw.read()
ext: str = comic_url.split(".")[-1]\
comic_content = comic_request.raw.read()
ext = comic_url.split(".")[-1]\
.split("?")[0].split("&")[0]
async with ClientSession() as session:
for chanid, _hook in self.THREADS.get('comic_qc').items():
comic_image: io.BytesIO = io.BytesIO(comic_content)
channel: int = chanid
hook_uri: str = _hook[0]
thread_id: int = _hook[1]
webhook: discord.Webhook = discord.Webhook.from_url(hook_uri,
for chanid, _hook in self.THREADS.get('comic_qc', {}).items():
comic_image = io.BytesIO(comic_content)
channel = chanid
(hook_uri, thread_id) = _hook
webhook = discord.Webhook.from_url(hook_uri,
session=session)
thread: discord.Thread = self.bot.get_channel(channel)\
.get_thread(thread_id)
_channel = self.bot.get_channel(channel)
if not _channel:
return
thread = _channel.get_thread(thread_id)
await webhook.send(f"**{comic_title}**", file=discord.File(comic_image, filename=f'img.{ext}'),
username="Questionable Content", thread=thread)
await asyncio.sleep(2)
@ -335,24 +363,27 @@ class Meme(commands.Cog):
pass
try:
for comic in dino_comics:
if not comic:
continue
(comic_title, comic_url) = comic
comic_title: str = discord.utils.escape_markdown(comic_title)
comic_title = discord.utils.escape_markdown(comic_title)
comic_request = requests.get(comic_url, stream=True, timeout=(5, 20), headers=headers)
comic_request.raise_for_status()
comic_content: bytes = comic_request.raw.read()
comic_content = comic_request.raw.read()
ext = comic_url.split(".")[-1]\
.split("?")[0].split("&")[0]
async with ClientSession() as session:
for chanid, _hook in self.THREADS.get('comic_dino').items():
comic_image: io.BytesIO = io.BytesIO(comic_content)
channel: int = chanid
hook_uri: str = _hook[0]
thread_id: int = _hook[1]
webhook: discord.Webhook = discord.Webhook.from_url(hook_uri,
for chanid, _hook in self.THREADS.get('comic_dino', {}).items():
comic_image = io.BytesIO(comic_content)
channel = chanid
(hook_uri, thread_id) = _hook
webhook = discord.Webhook.from_url(hook_uri,
session=session)
thread: discord.Thread = self.bot.get_channel(channel)\
.get_thread(thread_id)
_channel = self.bot.get_channel(channel)
if not _channel:
return
thread = _channel.get_thread(thread_id)
await webhook.send(f"**{comic_title}**", file=discord.File(comic_image, filename=f'img.{ext}'),
username="Dinosaur Comics", thread=thread)
await asyncio.sleep(2)
@ -360,15 +391,17 @@ class Meme(commands.Cog):
pass
try:
for onion in onions:
if not onion:
continue
(onion_title, onion_description, onion_link, onion_video) = onion
onion_description: list[str] = textwrap.wrap(text=onion_description,
onion_description = textwrap.wrap(text=onion_description,
width=860, max_lines=1)[0]
embed: discord.Embed = discord.Embed(title=onion_title)
embed.add_field(name="Content", value=f"{onion_description[0:960]}\n-# {onion_link}")
async with ClientSession() as session:
for hook in self.NO_THREAD_WEBHOOKS.get('theonion'):
hook_uri: str = hook
webhook: discord.Webhook = discord.Webhook.from_url(hook_uri,
for hook in self.NO_THREAD_WEBHOOKS.get('theonion', {}):
hook_uri = hook
webhook = discord.Webhook.from_url(hook_uri,
session=session)
await webhook.send(embed=embed, username="The Onion")
if onion_video:
@ -379,16 +412,18 @@ class Meme(commands.Cog):
try:
for thn in thns:
logging.debug("Trying thn...")
if not thn:
continue
(thn_title, thn_description, thn_link, thn_pubdate, thn_video) = thn
thn_description: list[str] = textwrap.wrap(text=thn_description,
thn_description = textwrap.wrap(text=thn_description,
width=860, max_lines=1)[0]
embed: discord.Embed = discord.Embed(title=thn_title)
embed = discord.Embed(title=thn_title)
embed.add_field(name="Content", value=f"{thn_description[0:960]}\n-# {thn_link}")
embed.add_field(name="Published", value=thn_pubdate, inline=False)
async with ClientSession() as session:
for hook in self.NO_THREAD_WEBHOOKS.get('thn'):
hook_uri: str = hook
webhook: discord.Webhook = discord.Webhook.from_url(hook_uri,
for hook in self.NO_THREAD_WEBHOOKS.get('thn', {}):
hook_uri = hook
webhook = discord.Webhook.from_url(hook_uri,
session=session)
await webhook.send(embed=embed, username="The Hacker News")
if thn_video:
@ -422,8 +457,8 @@ class Meme(commands.Cog):
except:
traceback.print_exc()
@bridge.bridge_command()
@is_spamchan() # pylint: disable=too-many-function-args
@bridge.bridge_command() # type: ignore
@is_spamchan()
async def meme(self, ctx) -> None:
"""Create Meme"""
await ctx.respond(view=MemeView())

View File

@ -12,6 +12,7 @@ from .misc_util import Util
import aiosqlite as sqlite3
from sh import cowsay as cow_say, fortune # pylint: disable=no-name-in-module
from discord.ext import bridge, commands, tasks
from disc_havoc import Havoc
# pylint: disable=bare-except, broad-exception-caught, broad-exception-raised, global-statement
# pylint: disable=too-many-lines, invalid-name
@ -25,8 +26,8 @@ BOT_CHANIDS = []
class Misc(commands.Cog):
"""Misc/Assorted Cog for Havoc"""
def __init__(self, bot):
self.bot: discord.Bot = bot
def __init__(self, bot: Havoc):
self.bot: Havoc = bot
self.util = Util()
self.COWS: list[str] = os.listdir(os.path.join("/",

View File

@ -9,13 +9,14 @@ from typing import Optional
import discord
import requests
from discord.ext import bridge, commands
from disc_havoc import Havoc
import util
class Owner(commands.Cog):
"""Owner Cog for Havoc"""
def __init__(self, bot) -> None:
self.bot: discord.Bot = bot
self.former_roles_store: dict = {}
def __init__(self, bot: Havoc) -> None:
self.bot: Havoc = bot
self.former_roles_store: dict[int, list[discord.Role]] = {}
self._temperature: int = random.randrange(20, 30)
@bridge.bridge_command(guild_ids=[1145182936002482196])
@ -41,7 +42,7 @@ class Owner(commands.Cog):
return await ctx.respond("Too cold! (-15°C minimum)")
elif _temperature > 35:
return await ctx.respond("Too hot! (35°C maximum)")
self._temperature: int = _temperature
self._temperature = _temperature
return await ctx.respond(f"As per your request, I have adjusted the temperature to {_temperature} °C.")
@bridge.bridge_command()
@ -54,6 +55,7 @@ class Owner(commands.Cog):
Returns:
None
"""
self.bot.load_exts(False)
await ctx.respond("Reloaded!", ephemeral=True)
@ -70,20 +72,18 @@ class Owner(commands.Cog):
Returns:
None
"""
parameters: list[str] = parameters.split(" ")
_parameters: list[str] = parameters.split(" ")
if not len(parameters) > 1:
if not len(_parameters) > 1:
return await ctx.respond("**Error**: Incorrect command usage; required: <chan> <msg>", ephemeral=True)
channel: str = parameters[0]
channel_mentions: list[str] = discord.utils.raw_channel_mentions(channel)
channel: str = _parameters[0]
channel_mentions: list[int] = discord.utils.raw_channel_mentions(channel)
if channel_mentions:
channel: str = str(channel_mentions[0])
channel = str(channel_mentions[0])
msg: str = " ".join(parameters[1:])
sent = await util.discord_helpers.send_message(self.bot, channel=channel,
await util.discord_helpers.send_message(self.bot, channel=channel,
message=msg)
if not sent:
return await ctx.respond("**Failed.**", ephemeral=True)
return await ctx.respond("**Done.**", ephemeral=True)
@bridge.bridge_command()
@ -144,6 +144,8 @@ class Owner(commands.Cog):
None
"""
try:
if not isinstance(message.channel, discord.TextChannel):
return
memes_channel: discord.TextChannel = ctx.guild.get_channel(1147229098544988261)
message_content: str = message.content
message_author: str = message.author.display_name
@ -156,7 +158,9 @@ class Owner(commands.Cog):
timeout=20).raw.read())
ext: str = item.url.split(".")[-1]\
.split("?")[0].split("&")[0]
_file: discord.File = discord.File(image, filename=f'img.{ext}')
_file = discord.File(image, filename=f'img.{ext}')
if not _file:
return # No file to move
await memes_channel.send(f"*Performing bureaucratic duties (this didn't belong in #{message_channel})...*\n**{message_author}:** {message_content}", file=_file)
await message.delete()
await ctx.respond("OK!", ephemeral=True)
@ -176,6 +180,8 @@ class Owner(commands.Cog):
None
"""
try:
if not isinstance(message.channel, discord.TextChannel):
return
drugs_channel: discord.TextChannel = ctx.guild.get_channel(1172247451047034910)
message_content: str = message.content
message_author: str = message.author.display_name
@ -188,7 +194,9 @@ class Owner(commands.Cog):
timeout=20).raw.read())
ext: str = item.url.split(".")[-1]\
.split("?")[0].split("&")[0]
_file: discord.File = discord.File(image, filename=f'img.{ext}')
_file = discord.File(image, filename=f'img.{ext}')
if not _file:
return # No file to move
await drugs_channel.send(f"*Performing bureaucratic duties (this didn't belong in #{message_channel})...\
*\n**{message_author}:** {message_content}", file=_file)
await message.delete()
@ -209,6 +217,8 @@ class Owner(commands.Cog):
None
"""
try:
if not isinstance(message.channel, discord.TextChannel):
return
funhouse_channel: discord.TextChannel = ctx.guild.get_channel(1213160512364478607)
message_content: str = message.content
message_author: str = message.author.display_name
@ -221,7 +231,7 @@ class Owner(commands.Cog):
timeout=20).raw.read())
ext: str = item.url.split(".")[-1]\
.split("?")[0].split("&")[0]
_file: discord.File = discord.File(image, filename=f'img.{ext}')
_file = discord.File(image, filename=f'img.{ext}')
await funhouse_channel.send(f"*Performing bureaucratic duties (this didn't belong in #{message_channel})\
...*\n**{message_author}:** {message_content}")
await message.delete()
@ -244,12 +254,14 @@ class Owner(commands.Cog):
try:
if not ctx.guild.id == 1145182936002482196:
return # Not home server!
if not member.roles:
return # No roles
audit_reason: str = f"Einsperren von {ctx.user.display_name}"
member: discord.Member = ctx.guild.get_member(member.id)
member = ctx.guild.get_member(member.id)
member_display: str = member.display_name
einsperren_role: discord.Role = ctx.guild.get_role(1235415059300093973) if ctx.guild.id != 1145182936002482196\
else ctx.guild.get_role(1235406301614309386)
member_roles: list[discord.Role] = [role for role in member.roles if not role.name == "@everyone"]
member_roles: list = [role for role in member.roles if not role.name == "@everyone"]
member_role_names: list[str] = [str(role.name).lower() for role in member_roles]
opers_chan: discord.TextChannel = ctx.guild.get_channel(1181416083287187546)
if not "einsperren" in member_role_names:
@ -271,7 +283,7 @@ class Owner(commands.Cog):
if not member.id in self.former_roles_store:
await member.edit(roles=[]) # No roles
else:
former_roles: list[discord.Role] = self.former_roles_store.get(member.id)
former_roles: list = self.former_roles_store.get(member.id, [0])
await member.edit(roles=former_roles, reason=f"De-{audit_reason}")
await ctx.respond(f"{member_display} wurde von der Einsperre befreit.", ephemeral=True)

View File

@ -13,11 +13,12 @@ import asyncio
import discord
import aiosqlite as sqlite3
from discord.ext import bridge, commands
from disc_havoc import Havoc
class DB:
"""DB Utility for Quote Cog"""
def __init__(self, bot):
self.bot = bot
def __init__(self, bot: Havoc):
self.bot: Havoc = bot
self.db_path = os.path.join("/", "usr", "local", "share",
"sqlite_dbs", "quotes.db")
self.hp_chanid = 1157529874936909934
@ -120,8 +121,8 @@ class DB:
class Quote(commands.Cog):
"""Quote Cog for Havoc"""
def __init__(self, bot):
self.bot = bot
def __init__(self, bot: Havoc):
self.bot: Havoc = bot
self.db = DB(self.bot)
def is_homeserver(): # pylint: disable=no-method-argument

View File

@ -7,12 +7,13 @@ from typing import Optional
from discord.ext import bridge, commands, tasks
from util.radio_util import get_now_playing
import discord
from disc_havoc import Havoc
class Radio(commands.Cog):
"""Radio Cog for Havoc"""
def __init__(self, bot: discord.Bot) -> None:
self.bot: discord.Bot = bot
self.channels: dict[tuple] = {
def __init__(self, bot: Havoc) -> None:
self.bot: Havoc = bot
self.channels: dict[str, tuple] = {
'sfm': (1145182936002482196, 1221615558492029050), # Tuple: Guild Id, Chan Id
}
self.STREAM_URL: str = "https://relay.sfm.codey.lol/aces.ogg"

View File

@ -10,20 +10,22 @@ import discord
import regex
from util.sing_util import Utility
from discord.ext import bridge, commands
from disc_havoc import Havoc
BOT_CHANIDS = []
class Sing(commands.Cog):
"""Sing Cog for Havoc"""
def __init__(self, bot):
self.bot: discord.Bot = bot
def __init__(self, bot: Havoc) -> None:
self.bot: Havoc = bot
self.utility = Utility()
global BOT_CHANIDS
BOT_CHANIDS = self.bot.BOT_CHANIDS # Inherit
self.control_strip_regex: Pattern = regex.compile(r"\x0f|\x1f|\035|\002|\u2064|\x02|(\x03([0-9]{1,2}))|(\x03|\003)(?:\d{1,2}(?:,\d{1,2})?)?",
regex.UNICODE)
def is_spamchan(): # pylint: disable=no-method-argument
def is_spamchan(): # type: ignore
"""Check if channel is spam chan"""
def predicate(ctx):
try:
@ -57,7 +59,7 @@ class Sing(commands.Cog):
for _activity in ctx.author.activities:
if _activity.type == discord.ActivityType.listening:
activity: discord.Activity = _activity
activity = _activity
if not activity:
return await ctx.respond("**Error**: No song specified, no activity found to read.")
@ -65,7 +67,9 @@ class Sing(commands.Cog):
if interaction:
await ctx.respond("*Searching...*", ephemeral=True) # Must respond to interactions within 3 seconds, per Discord
(search_artist, search_song, search_subsearch) = self.utility.parse_song_input(song, activity)
parsed = self.utility.parse_song_input(song, activity)
if isinstance(parsed, tuple):
(search_artist, search_song, search_subsearch) = parsed
# await ctx.respond(f"So, {search_song} by {search_artist}? Subsearch: {search_subsearch} I will try...") # Commented, useful for debugging
search_result: list[str] = await self.utility.lyric_search(search_artist, search_song,
@ -73,13 +77,16 @@ class Sing(commands.Cog):
if len(search_result) == 1:
return await ctx.respond(search_result[0].strip())
(search_result_artist, search_result_song, search_result_src,
search_result_confidence, search_result_time_taken) = search_result[0] # First index is a tuple
if not isinstance(search_result[0], tuple):
return # Invalid data type
(
search_result_artist, search_result_song, search_result_src,
search_result_confidence, search_result_time_taken
) = search_result[0] # First index is a tuple
search_result_wrapped: list[str] = search_result[1] # Second index is the wrapped lyrics
search_result_wrapped_short: list[str] = search_result[2] # Third is short wrapped lyrics
if not ctx.channel.id in BOT_CHANIDS:
search_result_wrapped: list[str] = search_result_wrapped_short # Replace with shortened lyrics for non spamchans
search_result_wrapped = search_result_wrapped_short # Replace with shortened lyrics for non spamchans
embeds: list[Optional[discord.Embed]] = []
embed_url: str = f"[on codey.lol](https://codey.lol/#{urllib.parse.quote(search_artist)}/{urllib.parse.quote(search_song)})"
c: int = 0
@ -87,8 +94,8 @@ class Sing(commands.Cog):
for section in search_result_wrapped:
c+=1
if c == len(search_result_wrapped):
footer: str = f"Found on: {search_result_src}"
section: str = self.control_strip_regex.sub('', section)
footer = f"Found on: {search_result_src}"
section = self.control_strip_regex.sub('', section)
# if ctx.guild.id == 1145182936002482196:
# section = section.upper()
embed: discord.Embed = discord.Embed(
@ -131,7 +138,7 @@ class Sing(commands.Cog):
activity: Optional[discord.Activity] = None
for _activity in ctx.interaction.guild.get_member(member_id).activities:
if _activity.type == discord.ActivityType.listening:
activity: discord.Activity = _activity
activity = _activity
parsed: tuple|bool = self.utility.parse_song_input(song=None,
activity=activity)
if not parsed:
@ -139,6 +146,7 @@ class Sing(commands.Cog):
if IS_SPAMCHAN:
await ctx.respond(f"***Reading activity of {member_display}...***")
if isinstance(parsed, tuple):
(search_artist, search_song, search_subsearch) = parsed
await ctx.respond("*Searching...*", ephemeral=True) # Must respond to interactions within 3 seconds, per Discord
search_result: list = await self.utility.lyric_search(search_artist, search_song,
@ -153,7 +161,7 @@ class Sing(commands.Cog):
search_result_wrapped_short: list[str] = search_result[2] # Third index is shortened lyrics
if not IS_SPAMCHAN:
search_result_wrapped: list[str] = search_result_wrapped_short # Swap for shortened lyrics if not spam chan
search_result_wrapped = search_result_wrapped_short # Swap for shortened lyrics if not spam chan
embeds: list[Optional[discord.Embed]] = []
c: int = 0
@ -161,7 +169,7 @@ class Sing(commands.Cog):
for section in search_result_wrapped:
c+=1
if c == len(search_result_wrapped):
footer: str = f"Found on: {search_result_src}"
footer = f"Found on: {search_result_src}"
# if ctx.guild.id == 1145182936002482196:
# section = section.upper()
embed: discord.Embed = discord.Embed(
@ -174,8 +182,8 @@ class Sing(commands.Cog):
embed.set_footer(text=footer)
embeds.append(embed)
for embed in embeds:
await ctx.send(embed=embed)
for _embed in embeds:
await ctx.send(embed=_embed)
except Exception as e:
traceback.print_exc()
return await ctx.respond(f"ERR: {str(e)}")

View File

@ -10,6 +10,7 @@ import setproctitle
import hypercorn
import hypercorn.asyncio
from dotenv import load_dotenv
from asyncio import Task
from discord.ext import bridge, commands
from termcolor import colored
from constants import OWNERS, BOT_CHANIDS
@ -38,23 +39,23 @@ load_dotenv()
intents = discord.Intents.all()
intents.message_content = True
bot = bridge.Bot(command_prefix=".", intents=intents,
class Havoc(bridge.Bot):
def __init__(self) -> None:
super().__init__(command_prefix=".", intents=intents,
owner_ids=OWNERS, activity=bot_activity,
help_command=commands.MinimalHelpCommand())
self.BOT_CHANIDS = BOT_CHANIDS
@bot.event
async def on_ready() -> None:
"""Run on Bot Ready"""
logging.info("%s online!", bot.user)
def load_exts(initialRun: Optional[bool] = True) -> None:
def load_exts(self, initialRun: Optional[bool] = True) -> None:
"""
Load Cogs/Extensions
Args:
initialRun (Optional[bool]) default: True
Returns:
None"""
load_method = bot.load_extension if initialRun else bot.reload_extension
load_method = self.load_extension if initialRun\
else self.reload_extension
for cog in cogs_list:
logging.info("Loading: %s", cog)
@ -63,25 +64,31 @@ def load_exts(initialRun: Optional[bool] = True) -> None:
importlib.reload(api)
from api import API # pylint: disable=unused-import
api_config = hypercorn.config.Config()
api_config.bind = "10.10.10.100:5992"
api_instance = api.API(bot)
api_config.bind = ["10.10.10.100:5992"]
api_instance = api.API(self)
try:
bot.fapi_task.cancel()
self.fapi_task.cancel()
except Exception as e:
logging.debug("Failed to cancel fapi_task: %s",
str(e))
logging.info("Starting FAPI Task")
bot.fapi_task = bot.loop.create_task(hypercorn.asyncio.serve(api_instance.api_app,
self.fapi_task: Task = self.loop.create_task(hypercorn.asyncio.serve(api_instance.api_app,
api_config))
@commands.Cog.listener()
async def on_ready(self) -> None:
"""Run on Bot Ready"""
logging.info("%s online!", self.user)
def __init__() -> None:
logging.info(colored(f"Log level: {logging.getLevelName(logging.root.level)}",
"red", attrs=['reverse']))
bot.BOT_CHANIDS = BOT_CHANIDS
bot.load_exts = load_exts
bot.load_exts()
bot = Havoc()
bot.run(os.getenv('TOKEN'))
if __name__ == "__main__":

50
jesusmemes.py Normal file
View File

@ -0,0 +1,50 @@
#!/usr/bin/env python3.11
# Jesus Meme Generator (requires Catbox uploader)
import aiohttp # Not part of Python core
import asyncio # Part of Python core
import regex # Not part of Python core
import os # Part of Python core
import random # Part of Python core
import traceback
from urllib.parse import quote as urlquote
from catbox import Catbox # Not part of Python core
class JesusMemeGenerator():
def __init__(self):
self.MEMEAPIURL = "https://apimeme.com/meme?meme="
self.MEMESTORAGEDIR = os.path.join(os.path.expanduser("~"), "memes") # Save memes "temporarily" to the home directory-->'memes' subfolder; cleanup must be done externally
async def create_meme(self, top_line='', bottom_line='', meme="Jesus-Talking-To-Cool-Dude"):
try:
top_line = regex.sub('[^\p{Letter}\p{Number}\p{Punctuation}\p{Horiz_Space}\p{Currency_Symbol}]', '', top_line.strip())
bottom_line = regex.sub('[^\p{Letter}\p{Number}\p{Punctuation}\p{Horiz_Space}\p{Currency_Symbol}]', '', bottom_line.strip())
OUT_FNAME = ''
if len(top_line) < 1 or len(bottom_line) < 1:
return None
formed_url = f"{self.MEMEAPIURL}{meme}&top={top_line.strip()}&bottom={bottom_line.strip()}"
formed_url = regex.sub('\p{Horiz_Space}', '+', regex.sub('#', '%23', formed_url.strip()))
timeout = aiohttp.ClientTimeout(total=15)
async with aiohttp.ClientSession(timeout=timeout) as session:
async with session.get(formed_url) as response:
UUID = f"{random.getrandbits(8)}-{random.getrandbits(8)}"
OUT_FNAME = f"{UUID}.jpg"
with open(f"{self.MEMESTORAGEDIR}/{OUT_FNAME}", 'wb') as f:
f.write(await response.read())
if len (OUT_FNAME) > 0:
uploader = Catbox()
meme_link = uploader.upload(f"{self.MEMESTORAGEDIR}/{OUT_FNAME}")
return meme_link
except:
print(traceback.format_exc())
pass
return None

28
pyproject.toml Normal file
View File

@ -0,0 +1,28 @@
[project]
name = "havoc"
version = "0.1.0"
description = "Add your description here"
readme = "README.md"
requires-python = ">=3.11"
dependencies = [
"aiosqlite>=0.20.0",
"beautifulsoup4>=4.12.3",
"bohancompliment>=0.2.0",
"fastapi>=0.115.7",
"feedparser>=6.0.11",
"hypercorn>=0.17.3",
"nvdlib>=0.7.9",
"openai>=1.60.0",
"py-cord[voice]>=2.6.1",
"pydantic>=2.10.6",
"python-dotenv>=1.0.1",
"pytz>=2024.2",
"regex>=2024.11.6",
"setproctitle>=1.3.4",
"sh>=2.2.1",
"termcolor>=2.5.0",
"websockets>=14.2",
]
[tool.mypy]
disable_error_code = ["import-untyped"]

View File

@ -18,16 +18,19 @@ async def get_channel_by_name(bot: discord.Bot, channel: str,
Returns:
Optional[Any]
"""
channel: str = re.sub(r'^#', '', channel.strip())
channel = re.sub(r'^#', '', channel.strip())
if not guild:
return discord.utils.get(bot.get_all_channels(),
name=channel)
else:
channels: list = bot.get_guild(guild).channels
_guild: Optional[discord.Guild] = bot.get_guild(guild)
if not _guild:
return None
channels: list = _guild.channels
for _channel in channels:
if _channel.name.lower() == channel.lower().strip():
return _channel
return
return None
async def send_message(bot: discord.Bot, channel: str,
message: str, guild: int | None = None) -> None:
@ -43,10 +46,12 @@ async def send_message(bot: discord.Bot, channel: str,
None
"""
if channel.isnumeric():
channel: int = int(channel)
_channel = bot.get_channel(channel)
channel_int: int = int(channel)
_channel = bot.get_channel(channel_int)
else:
channel: str = re.sub(r'^#', '', channel.strip())
channel = re.sub(r'^#', '', channel.strip())
_channel = await get_channel_by_name(bot=bot,
channel=channel, guild=guild)
if not isinstance(_channel, discord.TextChannel):
return None
await _channel.send(message)