reformat (black)

This commit is contained in:
codey 2025-04-17 14:35:56 -04:00
parent d12b066c8e
commit 1bb482315e
20 changed files with 1928 additions and 1326 deletions

14
api.py
View File

@ -6,6 +6,7 @@ from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
import util
class ValidSendMsgRequest(BaseModel):
"""
- **guild**: optional, guild id in case multiple channels match (normally first result would be used)
@ -17,18 +18,19 @@ class ValidSendMsgRequest(BaseModel):
channel: str
message: str
class API:
"""API [FastAPI Instance] for Havoc"""
def __init__(self, discord_bot):
api_app = FastAPI(title="Havoc API")
self.bot = discord_bot
self.api_app = api_app
@api_app.get("/{any:path}")
def block_get():
raise HTTPException(status_code=403, detail="Invalid request")
@api_app.post("/send_msg")
async def send_msg_handler(data: ValidSendMsgRequest):
await util.discord_helpers.send_message(
@ -38,12 +40,14 @@ class API:
message=data.message,
)
return {
'result': "presumed_success",
"result": "presumed_success",
}
def __init__():
import util
importlib.reload(util)
__init__()
__init__()

View File

@ -1,6 +1,7 @@
import sys
from os import path
sys.path.append( path.dirname( path.dirname( path.abspath(__file__) ) ) )
sys.path.append(path.dirname(path.dirname(path.abspath(__file__))))
import constants
import traceback
import time
@ -14,22 +15,24 @@ from aiohttp import ClientSession, ClientTimeout
from discord.ext import bridge, commands, tasks
from disc_havoc import Havoc
class Util:
"""Karma Utility"""
def __init__(self, bot: Havoc):
self.bot: Havoc = bot
self.api_key: str = constants.PRV_API_KEY
self.karma_endpoints_base_url: str = "https://api.codey.lol/karma/"
self.karma_retrieval_url: str = f"{self.karma_endpoints_base_url}get"
self.karma_update_url: str = f"{self.karma_endpoints_base_url}modify"
self.karma_top_10_url: str = f"{self.karma_endpoints_base_url}top"
self.timers: dict = {} # discord uid : timestamp, used for rate limiting
self.karma_cooldown: int = 15 # 15 seconds between karma updates
self.karma_top_10_url: str = f"{self.karma_endpoints_base_url}top"
self.timers: dict = {} # discord uid : timestamp, used for rate limiting
self.karma_cooldown: int = 15 # 15 seconds between karma updates
async def get_karma(self, keyword: str) -> int:
"""
Get Karma for Keyword
Args:
keyword (str)
Returns:
@ -37,65 +40,75 @@ class Util:
"""
try:
async with ClientSession() as session:
async with await session.post(self.karma_retrieval_url,
json={'keyword': keyword},
headers={
'content-type': 'application/json; charset=utf-8',
'X-Authd-With': f'Bearer {constants.KARMA_API_KEY}',
}, timeout=ClientTimeout(connect=3, sock_read=5)) as request:
async with await session.post(
self.karma_retrieval_url,
json={"keyword": keyword},
headers={
"content-type": "application/json; charset=utf-8",
"X-Authd-With": f"Bearer {constants.KARMA_API_KEY}",
},
timeout=ClientTimeout(connect=3, sock_read=5),
) as request:
resp = await request.json()
return resp.get('count')
return resp.get("count")
except Exception as e:
traceback.print_exc()
return False
async def get_top(self, n: int = 10) -> Optional[dict]:
"""
Get top (n=10) Karma
Args:
n (int): Number of top results to return, default 10
Returns:
Optional[dict]
Optional[dict]
"""
try:
async with ClientSession() as session:
async with await session.post(self.karma_top_10_url,
json = {
'n': n,
},
headers={
'content-type': 'application/json; charset=utf-8',
'X-Authd-With': f'Bearer {constants.KARMA_API_KEY}'
}, timeout=ClientTimeout(connect=3, sock_read=5)) as request:
async with await session.post(
self.karma_top_10_url,
json={
"n": n,
},
headers={
"content-type": "application/json; charset=utf-8",
"X-Authd-With": f"Bearer {constants.KARMA_API_KEY}",
},
timeout=ClientTimeout(connect=3, sock_read=5),
) as request:
resp: dict = await request.json()
return resp
except:
traceback.print_exc()
return None
async def get_top_embed(self, n:int = 10) -> Optional[discord.Embed]:
async def get_top_embed(self, n: int = 10) -> Optional[discord.Embed]:
"""
Get Top Karma Embed
Args:
n (int): Number of top results to return, default 10
Returns:
Optional[discord.Embed]
Optional[discord.Embed]
"""
top: Optional[dict] = await self.get_top(n)
if not top:
return None
top_formatted: str = ""
for x, item in enumerate(top):
top_formatted += f"{x+1}. **{discord.utils.escape_markdown(item[0])}**: *{item[1]}*\n"
top_formatted += (
f"{x+1}. **{discord.utils.escape_markdown(item[0])}**: *{item[1]}*\n"
)
top_formatted = top_formatted.strip()
embed: discord.Embed = discord.Embed(title=f"Top {n} Karma",
description=top_formatted,
colour=0xff00ff)
embed: discord.Embed = discord.Embed(
title=f"Top {n} Karma", description=top_formatted, colour=0xFF00FF
)
return embed
async def update_karma(self, display: str, _id: int, keyword: str, flag: int) -> bool:
async def update_karma(
self, display: str, _id: int, keyword: str, flag: int
) -> bool:
"""
Update Karma for Keyword
Args:
@ -109,33 +122,34 @@ class Util:
"""
if not flag in [0, 1]:
return False
reqObj: dict = {
'granter': f"Discord: {display} ({_id})",
'keyword': keyword,
'flag': flag,
"granter": f"Discord: {display} ({_id})",
"keyword": keyword,
"flag": flag,
}
try:
async with ClientSession() as session:
async with await session.post(self.karma_update_url,
json=reqObj,
headers={
'content-type': 'application/json; charset=utf-8',
'X-Authd-With': f'Bearer {self.api_key}',
},
timeout=ClientTimeout(connect=3, sock_read=5)) as request:
async with await session.post(
self.karma_update_url,
json=reqObj,
headers={
"content-type": "application/json; charset=utf-8",
"X-Authd-With": f"Bearer {self.api_key}",
},
timeout=ClientTimeout(connect=3, sock_read=5),
) as request:
result = await request.json()
return result.get('success', False)
return result.get("success", False)
except:
traceback.print_exc()
return False
async def check_cooldown(self, user_id: int) -> bool:
"""
Check if member has met cooldown period prior to adjusting karma
Args:
user_id (int): The Discord UID to check
Returns:
@ -147,19 +161,21 @@ class Util:
if (now - self.timers[user_id]) < self.karma_cooldown:
return False
return True
class Karma(commands.Cog):
"""Karma Cog for Havoc"""
def __init__(self, bot: Havoc):
importlib.reload(constants)
self.bot: Havoc = bot
self.util = Util(self.bot)
# self.karma_regex = regex.compile(r'(\w+)(\+\+|\-\-)')
self.karma_regex: Pattern = regex.compile(r'(\b\w+(?:\s+\w+)*)(\+\+($|\s)|\-\-($|\s))')
self.mention_regex: Pattern = regex.compile(r'(<@([0-9]{17,20})>)(\+\+|\-\-)')
self.mention_regex_no_flag: Pattern = regex.compile(r'(<@([0-9]{17,20})>+)')
self.karma_regex: Pattern = regex.compile(
r"(\b\w+(?:\s+\w+)*)(\+\+($|\s)|\-\-($|\s))"
)
self.mention_regex: Pattern = regex.compile(r"(<@([0-9]{17,20})>)(\+\+|\-\-)")
self.mention_regex_no_flag: Pattern = regex.compile(r"(<@([0-9]{17,20})>+)")
self.karma_chanid: int = 1307065684785893406
self.karma_msgid: int = 1325442184572567686
@ -170,7 +186,6 @@ class Karma(commands.Cog):
except Exception as e:
pass
@tasks.loop(seconds=30, reconnect=True)
async def update_karma_chan(self) -> None:
"""Update the Karma Chan Leaderboard"""
@ -180,42 +195,48 @@ class Karma(commands.Cog):
if not isinstance(channel, discord.TextChannel):
return
message_to_edit = await channel.fetch_message(self.karma_msgid)
await message_to_edit.edit(embed=top_embed,
content="## This message will automatically update periodically.")
await message_to_edit.edit(
embed=top_embed,
content="## This message will automatically update periodically.",
)
except:
traceback.print_exc()
@commands.Cog.listener()
async def on_message(self, message: discord.Message) -> None:
"""
Message hook, to monitor for ++/--
Also monitors for messages to #karma to autodelete, only Havoc may post in #karma!
"""
if not self.bot.user: # No valid client instance
if not self.bot.user: # No valid client instance
return
if not isinstance(message.channel, discord.TextChannel):
return
if message.channel.id == self.karma_chanid and not message.author.id == self.bot.user.id:
if (
message.channel.id == self.karma_chanid
and not message.author.id == self.bot.user.id
):
"""Message to #karma not by Havoc, delete it"""
await message.delete(reason="Messages to #karma are not allowed")
removal_embed: discord.Embed = discord.Embed(
title="Message Deleted",
description=f"Your message to **#{message.channel.name}** has been automatically deleted.\n**Reason**: Messages to this channel by users is not allowed."
description=f"Your message to **#{message.channel.name}** has been automatically deleted.\n**Reason**: Messages to this channel by users is not allowed.",
)
await message.author.send(embed=removal_embed)
if message.author.id == self.bot.user.id: # Bots own message
if message.author.id == self.bot.user.id: # Bots own message
return
if not message.guild:
return
if not message.guild.id in [1145182936002482196, 1228740575235149855]: # Not a valid guild for cmd
if not message.guild.id in [
1145182936002482196,
1228740575235149855,
]: # Not a valid guild for cmd
return
message_content: str = message.content.strip()
mentions: list = regex.findall(self.mention_regex, message_content)
for mention in mentions:
try:
logging.debug("Mention: %s", mention)
@ -235,26 +256,28 @@ class Karma(commands.Cog):
message_content = discord.utils.escape_markdown(message_content)
karma_regex: list[str] = regex.findall(self.karma_regex, message_content.strip())
karma_regex: list[str] = regex.findall(
self.karma_regex, message_content.strip()
)
if not karma_regex: # Not a request to adjust karma
return
flooding: bool = not await self.util.check_cooldown(message.author.id)
exempt_uids: list[int] = [1172340700663255091, 992437729927376996]
exempt_uids: list[int] = [1172340700663255091, 992437729927376996]
if flooding and not message.author.id in exempt_uids:
return await message.add_reaction(emoji="")
processed_keywords_lc: list[str] = []
logging.debug("Matched: %s", karma_regex)
for matched_keyword in karma_regex:
if not isinstance(matched_keyword, tuple):
continue
if len(matched_keyword) == 4:
(keyword, friendly_flag, _, __) = matched_keyword
else:
(keyword, friendly_flag) = matched_keyword
(keyword, friendly_flag) = matched_keyword
now: int = int(time.time())
flag: int = None
@ -269,13 +292,14 @@ class Karma(commands.Cog):
if keyword.lower() in processed_keywords_lc:
continue
processed_keywords_lc.append(keyword.lower())
self.util.timers[message.author.id] = now
updated: bool = await self.util.update_karma(message.author.display_name,
message.author.id, keyword, flag)
updated: bool = await self.util.update_karma(
message.author.display_name, message.author.id, keyword, flag
)
if updated:
return await message.add_reaction(emoji="👍")
@ -288,9 +312,9 @@ class Karma(commands.Cog):
if not top_10_embed:
return
return await ctx.respond(embed=top_10_embed)
keyword = discord.utils.escape_markdown(keyword)
mentions: list[str] = regex.findall(self.mention_regex_no_flag, keyword)
for mention in mentions:
@ -299,7 +323,9 @@ class Karma(commands.Cog):
guild: Optional[discord.Guild] = self.bot.get_guild(ctx.guild.id)
if not guild:
return
guild_member: Optional[discord.Member] = guild.get_member(mentioned_uid)
guild_member: Optional[discord.Member] = guild.get_member(
mentioned_uid
)
if not guild_member:
return
display = guild_member.display_name
@ -310,21 +336,22 @@ class Karma(commands.Cog):
score: int = await self.util.get_karma(keyword)
description: str = f"**{keyword}** has a karma of *{score}*"
embed: discord.Embed = discord.Embed(title=f"Karma for {keyword}",
description=description)
embed: discord.Embed = discord.Embed(
title=f"Karma for {keyword}", description=description
)
return await ctx.respond(embed=embed)
except Exception as e:
await ctx.respond(f"Error: {str(e)}")
traceback.print_exc()
def cog_unload(self) -> None:
try:
self.update_karma_chan.cancel()
except:
"""Safe to ignore"""
pass
def setup(bot) -> None:
"""Run on Cog Load"""
bot.add_cog(Karma(bot))

View File

@ -5,8 +5,10 @@ from discord.ext import bridge, commands
from util.lovehate_db import DB
from disc_havoc import Havoc
class LoveHate(commands.Cog):
"""LoveHate Cog for Havoc"""
def __init__(self, bot: Havoc) -> None:
self.bot: Havoc = bot
self.db = DB(self.bot)
@ -14,16 +16,15 @@ class LoveHate(commands.Cog):
def join_with_and(self, items: list) -> str:
"""
Join list with and added before last item
Args:
items (list)
Returns:
str
str
"""
if len(items) > 1:
return ', '.join(items[:-1]) + ' and ' + items[-1]
return items[0] if items else ''
return ", ".join(items[:-1]) + " and " + items[-1]
return items[0] if items else ""
@bridge.bridge_command()
async def loves(self, ctx, user: Optional[str] = None) -> None:
@ -33,31 +34,32 @@ class LoveHate(commands.Cog):
try:
if not user:
display_name = ctx.author.display_name
loves: Union[list[tuple], bool] = await self.db.get_lovehates(user=display_name,
loves=True)
loves: Union[list[tuple], bool] = await self.db.get_lovehates(
user=display_name, loves=True
)
if not loves:
return await ctx.respond("You don't seem to love anything...")
out_loves: list = []
if not isinstance(loves, list):
return
for love in loves:
(love,) = love
out_loves.append(love)
out_loves_str: str = self.join_with_and(out_loves)
return await ctx.respond(f"{ctx.author.mention} loves {out_loves_str}")
loves = await self.db.get_lovehates(user=user.strip(), loves=True)
if not loves:
return await ctx.respond(f"{user} doesn't seem to love anything...")
return await ctx.respond(f"{user} doesn't seem to love anything...")
out_loves_str = self.join_with_and(out_loves)
return await ctx.respond(f"{user} loves {out_loves_str}")
except Exception as e:
traceback.print_exc()
return await ctx.respond(f"Error: {str(e)}")
@bridge.bridge_command()
async def wholoves(self, ctx, *, thing: Optional[str] = None) -> None:
"""
@ -70,38 +72,39 @@ class LoveHate(commands.Cog):
_thing = thing
if discord.utils.raw_mentions(_thing):
# There are mentions
thing_id: int = discord.utils.raw_mentions(_thing)[0] # First mention
thing_id: int = discord.utils.raw_mentions(_thing)[0] # First mention
guild: Optional[discord.Guild] = self.bot.get_guild(ctx.guild.id)
if not guild:
return
thing_member: Optional[discord.Member] = guild.get_member(thing_id)
if not thing_member:
return
_thing = thing_member.display_name
if not _thing:
return
who_loves: Union[list, bool] = await self.db.get_wholovehates(thing=_thing,
loves=True)
who_loves: Union[list, bool] = await self.db.get_wholovehates(
thing=_thing, loves=True
)
if not isinstance(who_loves, list):
return await ctx.respond(f"I couldn't find anyone who loves {thing}...")
out_wholoves: list = []
for lover in who_loves:
(lover,) = lover
out_wholoves.append(str(lover))
optional_s: str = "s" if len(out_wholoves) == 1 else ""
out_wholoves_str: str = self.join_with_and(out_wholoves)
return await ctx.respond(f"{out_wholoves_str} love{optional_s} {thing}")
except Exception as e:
traceback.print_exc()
return await ctx.respond(f"Error: {str(e)}")
@bridge.bridge_command()
async def whohates(self, ctx, *, thing: Optional[str] = None) -> None:
"""
@ -117,17 +120,18 @@ class LoveHate(commands.Cog):
guild: Optional[discord.Guild] = self.bot.get_guild(ctx.guild.id)
if not guild:
return
thing_id: int = discord.utils.raw_mentions(_thing)[0] # First mention
thing_id: int = discord.utils.raw_mentions(_thing)[0] # First mention
thing_member: Optional[discord.Member] = guild.get_member(thing_id)
if not thing_member:
return
_thing = thing_member.display_name
_thing = thing_member.display_name
who_hates: Union[list[tuple], bool] = await self.db.get_wholovehates(thing=_thing,
hates=True)
who_hates: Union[list[tuple], bool] = await self.db.get_wholovehates(
thing=_thing, hates=True
)
if not who_hates:
return await ctx.respond(f"I couldn't find anyone who hates {thing}...")
out_whohates: list = []
if not isinstance(who_hates, list):
return
@ -136,7 +140,7 @@ class LoveHate(commands.Cog):
out_whohates.append(str(hater))
optional_s: str = "s" if len(out_whohates) == 1 else ""
out_whohates_str: str = self.join_with_and(out_whohates)
return await ctx.respond(f"{out_whohates_str} hate{optional_s} {thing}")
@ -144,16 +148,14 @@ class LoveHate(commands.Cog):
traceback.print_exc()
return await ctx.respond(f"Error: {str(e)}")
@bridge.bridge_command()
async def dontcare(self, ctx, thing: str) -> None:
"""
Make me forget your opinion on <thing>
"""
try:
stop_caring: str = await self.db.update(ctx.author.display_name,
thing, 0)
return await ctx.respond(stop_caring)
stop_caring: str = await self.db.update(ctx.author.display_name, thing, 0)
return await ctx.respond(stop_caring)
except Exception as e:
await ctx.respond(f"Error: {str(e)}")
traceback.print_exc()
@ -166,29 +168,30 @@ class LoveHate(commands.Cog):
try:
if not user:
display_name = ctx.author.display_name
hates: Union[list[tuple], bool] = await self.db.get_lovehates(user=display_name,
hates=True)
hates: Union[list[tuple], bool] = await self.db.get_lovehates(
user=display_name, hates=True
)
if not hates:
return await ctx.respond("You don't seem to hate anything...")
else:
hates = await self.db.get_lovehates(user=user.strip(), hates=True)
if not hates:
return await ctx.respond(f"{user} doesn't seem to hate anything...")
return await ctx.respond(f"{user} doesn't seem to hate anything...")
out_hates: list = []
if not isinstance(hates, list):
return
for hated_thing in hates:
(hated_thing,) = hated_thing
out_hates.append(str(hated_thing))
out_hates_str: str = self.join_with_and(out_hates)
out_hates_str: str = self.join_with_and(out_hates)
return await ctx.respond(f"{user} hates {out_hates_str}")
except Exception as e:
await ctx.respond(f"Error: {str(e)}")
traceback.print_exc()
@bridge.bridge_command(aliases=['sarcastichate'])
@bridge.bridge_command(aliases=["sarcastichate"])
async def love(self, ctx, *, thing: str) -> None:
"""
Love <thing>
@ -196,7 +199,7 @@ class LoveHate(commands.Cog):
try:
if discord.utils.raw_mentions(thing):
# There are mentions
thing_id: int = discord.utils.raw_mentions(thing)[0] # First mention
thing_id: int = discord.utils.raw_mentions(thing)[0] # First mention
guild: Optional[discord.Guild] = self.bot.get_guild(ctx.guild)
if not guild:
return
@ -205,14 +208,13 @@ class LoveHate(commands.Cog):
return
thing = thing_member.display_name
love: str = await self.db.update(ctx.author.display_name,
thing, 1)
return await ctx.respond(love)
love: str = await self.db.update(ctx.author.display_name, thing, 1)
return await ctx.respond(love)
except Exception as e:
await ctx.respond(f"Error: {str(e)}")
traceback.print_exc()
@bridge.bridge_command(aliases=['sarcasticlove'])
@bridge.bridge_command(aliases=["sarcasticlove"])
async def hate(self, ctx, *, thing: str) -> None:
"""
Hate <thing>
@ -220,7 +222,7 @@ class LoveHate(commands.Cog):
try:
if discord.utils.raw_mentions(thing):
# There are mentions
thing_id: int = discord.utils.raw_mentions(thing)[0] # First mention
thing_id: int = discord.utils.raw_mentions(thing)[0] # First mention
guild: Optional[discord.Guild] = self.bot.get_guild(ctx.guild.id)
if not guild:
return
@ -228,18 +230,17 @@ class LoveHate(commands.Cog):
if not thing_member:
return
thing = thing_member.display_name
hate: str = await self.db.update(ctx.author.display_name,
thing, -1)
return await ctx.respond(hate)
hate: str = await self.db.update(ctx.author.display_name, thing, -1)
return await ctx.respond(hate)
except Exception as e:
await ctx.respond(f"Error: {str(e)}")
traceback.print_exc()
def cog_unload(self) -> None:
# not needed currently
pass
def setup(bot) -> None:
"""Run on Cog Load"""
bot.add_cog(LoveHate(bot))
bot.add_cog(LoveHate(bot))

View File

@ -4,10 +4,7 @@ import json
import io
import asyncio
import random
from typing import (LiteralString,
Optional,
Any,
Union)
from typing import LiteralString, Optional, Any, Union
import aiosqlite as sqlite3
import logging
import textwrap
@ -35,126 +32,152 @@ BOT_CHANIDS = []
TODO: Cleanup new meme leaderboard stuff
"""
class Helper:
"""Meme Helper"""
def load_meme_choices(self) -> None:
"""Load Available Meme Templates from JSON File"""
global meme_choices
memes_file: str|LiteralString = os.path.join(os.path.dirname(__file__), "memes.json")
with open(memes_file, 'r', encoding='utf-8') as f:
memes_file: str | LiteralString = os.path.join(
os.path.dirname(__file__), "memes.json"
)
with open(memes_file, "r", encoding="utf-8") as f:
meme_choices = json.loads(f.read())
class MemeView(discord.ui.View):
"""Meme Selection discord.ui.View"""
helper = Helper()
helper.load_meme_choices()
@discord.ui.select(
placeholder = "Choose a Meme!",
min_values = 1,
max_values = 1,
options = [
discord.SelectOption(
label=meme_label
) for meme_label in meme_choices[0:24]
]
placeholder="Choose a Meme!",
min_values=1,
max_values=1,
options=[
discord.SelectOption(label=meme_label) for meme_label in meme_choices[0:24]
],
)
async def select_callback(self, select: discord.ui.Select,
interaction: discord.Interaction) -> None:
async def select_callback(
self, select: discord.ui.Select, interaction: discord.Interaction
) -> None:
"""Meme Selection Callback"""
if not isinstance(select.values[0], str):
return
modal: discord.ui.Modal = MemeModal(meme=select.values[0], title="Meme Selected")
modal: discord.ui.Modal = MemeModal(
meme=select.values[0], title="Meme Selected"
)
await interaction.response.send_modal(modal)
class MemeModal(discord.ui.Modal):
"""Meme Creation discord.ui.Modal"""
def __init__(self, *args, meme: Optional[str] = None, **kwargs) -> None:
super().__init__(*args, **kwargs)
self.selected_meme: Optional[str] = meme
self.meme_generator = JesusMemeGenerator()
self.TEXT_LIMIT: int = 80
self.add_item(discord.ui.InputText(label="Top Text",
style=discord.InputTextStyle.singleline))
self.add_item(discord.ui.InputText(label="Bottom Text",
style=discord.InputTextStyle.singleline))
self.add_item(
discord.ui.InputText(
label="Top Text", style=discord.InputTextStyle.singleline
)
)
self.add_item(
discord.ui.InputText(
label="Bottom Text", style=discord.InputTextStyle.singleline
)
)
async def callback(self, interaction: discord.Interaction) -> None:
if not self.selected_meme: # No meme selected
return
selected_meme: str = self.selected_meme
if not self.children or len(self.children) < 2: # Invalid request
if not self.selected_meme: # No meme selected
return
selected_meme: str = self.selected_meme
if not self.children or len(self.children) < 2: # Invalid request
return
if not isinstance(self.children[0].value, str) or not isinstance(
self.children[1].value, str
): # Invalid request
return
if not isinstance(self.children[0].value, str)\
or not isinstance(self.children[1].value, str): # Invalid request
return
meme_top_line: str = self.children[0].value.strip()
meme_bottom_line: str = self.children[1].value.strip()
if len(meme_top_line) > self.TEXT_LIMIT or len(meme_bottom_line) > self.TEXT_LIMIT:
await interaction.response.send_message("ERR: Text is limited to 80 characters for each the top and bottom lines.")
if (
len(meme_top_line) > self.TEXT_LIMIT
or len(meme_bottom_line) > self.TEXT_LIMIT
):
await interaction.response.send_message(
"ERR: Text is limited to 80 characters for each the top and bottom lines."
)
return
meme_link: Optional[str] = await self.meme_generator.create_meme(top_line=meme_top_line,
bottom_line=meme_bottom_line, meme=selected_meme)
meme_link: Optional[str] = await self.meme_generator.create_meme(
top_line=meme_top_line, bottom_line=meme_bottom_line, meme=selected_meme
)
if not meme_link:
await interaction.response.send_message("Failed!")
return
return
embed: discord.Embed = discord.Embed(title="Generated Meme")
embed.set_image(url=meme_link)
embed.add_field(name="Meme", value=selected_meme, inline=True)
await interaction.response.send_message(embeds=[embed])
return
class Meme(commands.Cog):
"""Meme Cog for Havoc"""
def __init__(self, bot: Havoc) -> None:
self.bot: Havoc = bot
self.stats_db_path: LiteralString = os.path.join("/usr/local/share",
"sqlite_dbs", "stats.db")
self.stats_db_path: LiteralString = os.path.join(
"/usr/local/share", "sqlite_dbs", "stats.db"
)
self.meme_choices: list = []
self.meme_counter: int = 0
self.THREADS: dict[str, dict[int, list]] = {
# Format: Guild1: [ChanId : [Webhook, ThreadId], Guild2: [ChanId : [Webhook, ThreadId]
'comic_explosm': {
"comic_explosm": {
1298729744216359055: [constants.EXPLOSM_WEBHOOK, 1299165855493390367],
1306414795049926676: [constants.EXPLOSM_WEBHOOK2, 1306416492304138364],
},
'comic_xkcd': {
1298729744216359055: [constants.XKCD_WEBHOOK, 1299165928755433483],
"comic_xkcd": {
1298729744216359055: [constants.XKCD_WEBHOOK, 1299165928755433483],
1306414795049926676: [constants.XKCD_WEBHOOK2, 1306416681991798854],
},
'comic_smbc': {
"comic_smbc": {
1298729744216359055: [constants.SMBC_WEBHOOK, 1299166071038808104],
1306414795049926676: [constants.SMBC_WEBHOOK2, 1306416842511745024],
},
'comic_qc': {
1298729744216359055: [constants.QC_WEBHOOK, 1299392115364593674],
"comic_qc": {
1298729744216359055: [constants.QC_WEBHOOK, 1299392115364593674],
1306414795049926676: [constants.QC_WEBHOOK2, 1306417084774744114],
},
'comic_dino': {
1298729744216359055: [constants.DINO_WEBHOOK, 1299771918886506557],
"comic_dino": {
1298729744216359055: [constants.DINO_WEBHOOK, 1299771918886506557],
1306414795049926676: [constants.DINO_WEBHOOK2, 1306417286713704548],
}
},
}
self.NO_THREAD_WEBHOOKS: dict[str, list] = {
'theonion': [constants.ONION_WEBHOOK, constants.ONION_WEBHOOK2],
'thn': [constants.THN_WEBHOOK],
'memes': [constants.MEME_WEBHOOK1, constants.MEME_WEBHOOK2],
"theonion": [constants.ONION_WEBHOOK, constants.ONION_WEBHOOK2],
"thn": [constants.THN_WEBHOOK],
"memes": [constants.MEME_WEBHOOK1, constants.MEME_WEBHOOK2],
}
global BOT_CHANIDS
BOT_CHANIDS = self.bot.BOT_CHANIDS # Inherit
BOT_CHANIDS = self.bot.BOT_CHANIDS # Inherit
self.meme_stream_loop.start()
self.explosm_loop.start()
self.update_meme_lb.start()
asyncio.get_event_loop().create_task(self.init_meme_leaderboard())
def is_spamchan() -> bool: # type: ignore
def is_spamchan() -> bool: # type: ignore
"""Check if channel is spamchan"""
def predicate(ctx):
try:
if not ctx.channel.id in BOT_CHANIDS:
@ -163,24 +186,23 @@ class Meme(commands.Cog):
except:
traceback.print_exc()
return False
return commands.check(predicate) # type: ignore
async def leaderboard_increment(self,
uid: int) -> None:
return commands.check(predicate) # type: ignore
async def leaderboard_increment(self, uid: int) -> None:
"""
Increment leaderboard for uid
Args:
uid (int):
uid (int):
Returns:
None
"""
if not uid in self.meme_leaderboard:
self.meme_leaderboard[uid] = 1
else:
self.meme_leaderboard[uid] += 1
async with sqlite3.connect(self.stats_db_path, timeout=2) as db_conn:
"""Attempts both insert/update"""
query_1: str = "UPDATE memes SET count = count + 1 WHERE discord_uid = ?"
@ -199,34 +221,34 @@ class Meme(commands.Cog):
try:
await self.update_meme_lb()
except Exception as e:
logging.info("Failed to update meme leaderboard following increment: %s",
str(e))
logging.info(
"Failed to update meme leaderboard following increment: %s", str(e)
)
async def init_meme_leaderboard(self) -> None:
"""
INIT MEME LEADERBOARD
"""
self.meme_leaderboard: dict [int, int] = {}
self.meme_leaderboard: dict[int, int] = {}
async with sqlite3.connect(self.stats_db_path, timeout=2) as db_conn:
db_conn.row_factory = sqlite3.Row
db_query: str = "SELECT discord_uid, count FROM memes WHERE count > 0"
async with db_conn.execute(db_query) as db_cursor:
results = await db_cursor.fetchall()
for result in results:
uid = result['discord_uid']
count = result['count']
self.meme_leaderboard[uid] = count
uid = result["discord_uid"]
count = result["count"]
self.meme_leaderboard[uid] = count
@commands.Cog.listener()
async def on_ready(self) -> None:
"""Run on Bot Ready"""
await self.init_meme_leaderboard()
async def do_autos(self, only_comics: Optional[bool] = False) -> None:
"""
Run Auto Posters
Args:
only_comics (Optional[bool]): default False
Returns:
@ -243,7 +265,7 @@ class Meme(commands.Cog):
thn_grabber = thng.THNGrabber()
explosm_comics: list[Optional[tuple]] = []
xkcd_comics: list[Optional[tuple]] = []
smbc_comics: list[Optional[tuple]] = []
smbc_comics: list[Optional[tuple]] = []
dino_comics: list[Optional[tuple]] = []
onions: list[Optional[tuple]] = []
thns: list[Optional[tuple]] = []
@ -284,28 +306,34 @@ class Meme(commands.Cog):
except:
traceback.print_exc()
agents: list[str] = constants.HTTP_UA_LIST
headers: dict = {
'User-Agent': random.choice(agents)
}
headers: dict = {"User-Agent": random.choice(agents)}
if not only_comics:
try:
for meme in memes:
if not meme:
continue
(meme_id, meme_title, meme_url) = meme
request = requests.get(meme_url, stream=True, timeout=(5, 30), headers=headers)
request = requests.get(
meme_url, stream=True, timeout=(5, 30), headers=headers
)
if not request.status_code == 200:
continue
meme_content: bytes = request.raw.read()
for meme_hook in self.NO_THREAD_WEBHOOKS.get('memes', {}):
for meme_hook in self.NO_THREAD_WEBHOOKS.get("memes", {}):
meme_image: io.BytesIO = io.BytesIO(meme_content)
ext: str = meme_url.split(".")[-1]\
.split("?")[0].split("&")[0]
ext: str = (
meme_url.split(".")[-1].split("?")[0].split("&")[0]
)
async with ClientSession() as session:
webhook: discord.Webhook = discord.Webhook.from_url(meme_hook,
session=session)
await webhook.send(file=discord.File(meme_image,
filename=f'img.{ext}'), username="r/memes")
webhook: discord.Webhook = discord.Webhook.from_url(
meme_hook, session=session
)
await webhook.send(
file=discord.File(
meme_image, filename=f"img.{ext}"
),
username="r/memes",
)
await asyncio.sleep(2)
except:
pass
@ -315,25 +343,33 @@ class Meme(commands.Cog):
continue
(comic_title, comic_url) = comic
comic_title = discord.utils.escape_markdown(comic_title)
comic_request = requests.get(comic_url, stream=True, timeout=(5, 20), headers=headers)
comic_request = requests.get(
comic_url, stream=True, timeout=(5, 20), headers=headers
)
comic_request.raise_for_status()
comic_content: bytes = comic_request.raw.read()
ext = comic_url.split(".")[-1]\
.split("?")[0].split("&")[0]
ext = comic_url.split(".")[-1].split("?")[0].split("&")[0]
async with ClientSession() as session:
for chanid, _hook in self.THREADS.get('comic_explosm', {}).items():
for chanid, _hook in self.THREADS.get(
"comic_explosm", {}
).items():
comic_image: io.BytesIO = io.BytesIO(comic_content)
channel: int = chanid
(hook_uri, thread_id) = _hook
webhook = discord.Webhook.from_url(hook_uri,
session=session)
webhook = discord.Webhook.from_url(
hook_uri, session=session
)
_channel: Any = self.bot.get_channel(channel)
if not _channel:
return
thread = _channel.get_thread(thread_id)
await webhook.send(f"**{comic_title}**", file=discord.File(comic_image, filename=f'img.{ext}'),
username="Cyanide & Happiness", thread=thread)
await webhook.send(
f"**{comic_title}**",
file=discord.File(comic_image, filename=f"img.{ext}"),
username="Cyanide & Happiness",
thread=thread,
)
await asyncio.sleep(2)
except:
pass
@ -342,119 +378,140 @@ class Meme(commands.Cog):
if not comic:
continue
(comic_title, comic_url) = comic
comic_title = discord.utils.escape_markdown(comic_title)
comic_request = requests.get(comic_url, stream=True, timeout=(5, 20), headers=headers)
comic_title = discord.utils.escape_markdown(comic_title)
comic_request = requests.get(
comic_url, stream=True, timeout=(5, 20), headers=headers
)
comic_request.raise_for_status()
comic_content = comic_request.raw.read()
comic_image = io.BytesIO(comic_request.raw.read())
ext = comic_url.split(".")[-1]\
.split("?")[0].split("&")[0]
ext = comic_url.split(".")[-1].split("?")[0].split("&")[0]
async with ClientSession() as session:
for chanid, _hook in self.THREADS.get('comic_xkcd', {}).items():
for chanid, _hook in self.THREADS.get("comic_xkcd", {}).items():
comic_image = io.BytesIO(comic_content)
channel = chanid
(hook_uri, thread_id) = _hook
webhook = discord.Webhook.from_url(hook_uri,
session=session)
webhook = discord.Webhook.from_url(
hook_uri, session=session
)
_channel = self.bot.get_channel(channel)
if not _channel:
return
thread = _channel.get_thread(thread_id)
await webhook.send(f"**{comic_title}**", file=discord.File(comic_image, filename=f'img.{ext}'),
username="xkcd", thread=thread)
await webhook.send(
f"**{comic_title}**",
file=discord.File(comic_image, filename=f"img.{ext}"),
username="xkcd",
thread=thread,
)
await asyncio.sleep(2)
except:
pass
pass
try:
for comic in smbc_comics:
if not comic:
continue
(comic_title, comic_url) = comic
comic_title = discord.utils.escape_markdown(comic_title)
comic_request = requests.get(comic_url, stream=True, timeout=(5, 20), headers=headers)
comic_title = discord.utils.escape_markdown(comic_title)
comic_request = requests.get(
comic_url, stream=True, timeout=(5, 20), headers=headers
)
comic_request.raise_for_status()
comic_content = comic_request.raw.read()
ext = comic_url.split(".")[-1]\
.split("?")[0].split("&")[0]
ext = comic_url.split(".")[-1].split("?")[0].split("&")[0]
async with ClientSession() as session:
for chanid, _hook in self.THREADS.get('comic_smbc', {}).items():
for chanid, _hook in self.THREADS.get("comic_smbc", {}).items():
comic_image = io.BytesIO(comic_content)
channel = chanid
(hook_uri, thread_id) = _hook
webhook = discord.Webhook.from_url(hook_uri,
session=session)
webhook = discord.Webhook.from_url(
hook_uri, session=session
)
_channel = self.bot.get_channel(channel)
if not _channel:
return
thread = _channel.get_thread(thread_id)
await webhook.send(f"**{comic_title}**", file=discord.File(comic_image, filename=f'img.{ext}'),
username="SMBC", thread=thread)
await webhook.send(
f"**{comic_title}**",
file=discord.File(comic_image, filename=f"img.{ext}"),
username="SMBC",
thread=thread,
)
await asyncio.sleep(2)
except:
pass
pass
try:
for comic in qc_comics:
logging.debug("Trying QC...")
if not comic:
continue
(comic_title, comic_url) = comic
comic_title = discord.utils.escape_markdown(comic_title)
comic_url = regex.sub(r'^http://ww\.', 'http://www.',
comic_url)
comic_url = regex.sub(r'\.pmg$', '.png',
comic_url)
comic_request = requests.get(comic_url, stream=True,
timeout=(5, 20), headers=headers)
comic_title = discord.utils.escape_markdown(comic_title)
comic_url = regex.sub(r"^http://ww\.", "http://www.", comic_url)
comic_url = regex.sub(r"\.pmg$", ".png", comic_url)
comic_request = requests.get(
comic_url, stream=True, timeout=(5, 20), headers=headers
)
comic_request.raise_for_status()
comic_content = comic_request.raw.read()
ext = comic_url.split(".")[-1]\
.split("?")[0].split("&")[0]
ext = comic_url.split(".")[-1].split("?")[0].split("&")[0]
async with ClientSession() as session:
for chanid, _hook in self.THREADS.get('comic_qc', {}).items():
for chanid, _hook in self.THREADS.get("comic_qc", {}).items():
comic_image = io.BytesIO(comic_content)
channel = chanid
(hook_uri, thread_id) = _hook
webhook = discord.Webhook.from_url(hook_uri,
session=session)
webhook = discord.Webhook.from_url(
hook_uri, session=session
)
_channel = self.bot.get_channel(channel)
if not _channel:
return
thread = _channel.get_thread(thread_id)
await webhook.send(f"**{comic_title}**", file=discord.File(comic_image, filename=f'img.{ext}'),
username="Questionable Content", thread=thread)
await webhook.send(
f"**{comic_title}**",
file=discord.File(comic_image, filename=f"img.{ext}"),
username="Questionable Content",
thread=thread,
)
await asyncio.sleep(2)
except:
traceback.print_exc()
pass
pass
try:
for comic in dino_comics:
if not comic:
continue
(comic_title, comic_url) = comic
comic_title = discord.utils.escape_markdown(comic_title)
comic_request = requests.get(comic_url, stream=True, timeout=(5, 20), headers=headers)
comic_title = discord.utils.escape_markdown(comic_title)
comic_request = requests.get(
comic_url, stream=True, timeout=(5, 20), headers=headers
)
comic_request.raise_for_status()
comic_content = comic_request.raw.read()
ext = comic_url.split(".")[-1]\
.split("?")[0].split("&")[0]
ext = comic_url.split(".")[-1].split("?")[0].split("&")[0]
async with ClientSession() as session:
for chanid, _hook in self.THREADS.get('comic_dino', {}).items():
for chanid, _hook in self.THREADS.get("comic_dino", {}).items():
comic_image = io.BytesIO(comic_content)
channel = chanid
(hook_uri, thread_id) = _hook
webhook = discord.Webhook.from_url(hook_uri,
session=session)
(hook_uri, thread_id) = _hook
webhook = discord.Webhook.from_url(
hook_uri, session=session
)
_channel = self.bot.get_channel(channel)
if not _channel:
return
thread = _channel.get_thread(thread_id)
await webhook.send(f"**{comic_title}**", file=discord.File(comic_image, filename=f'img.{ext}'),
username="Dinosaur Comics", thread=thread)
await asyncio.sleep(2)
await webhook.send(
f"**{comic_title}**",
file=discord.File(comic_image, filename=f"img.{ext}"),
username="Dinosaur Comics",
thread=thread,
)
await asyncio.sleep(2)
except:
pass
try:
@ -462,15 +519,20 @@ class Meme(commands.Cog):
if not onion:
continue
(onion_title, onion_description, onion_link, onion_video) = onion
onion_description = textwrap.wrap(text=onion_description,
width=860, max_lines=1)[0]
onion_description = textwrap.wrap(
text=onion_description, width=860, max_lines=1
)[0]
embed: discord.Embed = discord.Embed(title=onion_title)
embed.add_field(name="Content", value=f"{onion_description[0:960]}\n-# {onion_link}")
embed.add_field(
name="Content",
value=f"{onion_description[0:960]}\n-# {onion_link}",
)
async with ClientSession() as session:
for hook in self.NO_THREAD_WEBHOOKS.get('theonion', {}):
for hook in self.NO_THREAD_WEBHOOKS.get("theonion", {}):
hook_uri = hook
webhook = discord.Webhook.from_url(hook_uri,
session=session)
webhook = discord.Webhook.from_url(
hook_uri, session=session
)
await webhook.send(embed=embed, username="The Onion")
if onion_video:
await webhook.send(f"^ video: {onion_video}")
@ -483,16 +545,20 @@ class Meme(commands.Cog):
if not thn:
continue
(thn_title, thn_description, thn_link, thn_pubdate, thn_video) = thn
thn_description = textwrap.wrap(text=thn_description,
width=860, max_lines=1)[0]
thn_description = textwrap.wrap(
text=thn_description, width=860, max_lines=1
)[0]
embed = discord.Embed(title=thn_title)
embed.add_field(name="Content", value=f"{thn_description[0:960]}\n-# {thn_link}")
embed.add_field(
name="Content", value=f"{thn_description[0:960]}\n-# {thn_link}"
)
embed.add_field(name="Published", value=thn_pubdate, inline=False)
async with ClientSession() as session:
for hook in self.NO_THREAD_WEBHOOKS.get('thn', {}):
for hook in self.NO_THREAD_WEBHOOKS.get("thn", {}):
hook_uri = hook
webhook = discord.Webhook.from_url(hook_uri,
session=session)
webhook = discord.Webhook.from_url(
hook_uri, session=session
)
await webhook.send(embed=embed, username="The Hacker News")
if thn_video:
await webhook.send(f"^ video: {thn_video}")
@ -507,30 +573,30 @@ class Meme(commands.Cog):
async def meme_stream_loop(self) -> None:
"""Meme Stream Loop (r/memes)"""
try:
await asyncio.sleep(10) # Try to ensure we are ready first
await asyncio.sleep(10) # Try to ensure we are ready first
self.meme_counter += 1
if self.meme_counter == 1:
return await self.do_autos(only_comics=True) # Skip first iteration!
return await self.do_autos(only_comics=True) # Skip first iteration!
await self.do_autos()
except:
traceback.print_exc()
traceback.print_exc()
@tasks.loop(hours=0.5)
async def explosm_loop(self) -> None:
"""Comic Loop"""
try:
await asyncio.sleep(10) # Try to ensure we are ready first
await asyncio.sleep(10) # Try to ensure we are ready first
await self.do_autos(only_comics=True)
except:
traceback.print_exc()
@bridge.bridge_command() # type: ignore
@is_spamchan()
async def meme(self, ctx) -> None:
traceback.print_exc()
@bridge.bridge_command() # type: ignore
@is_spamchan()
async def meme(self, ctx) -> None:
"""Create Meme"""
await ctx.respond(view=MemeView())
await ctx.respond(view=MemeView())
@bridge.bridge_command(hidden=True)
@commands.is_owner()
async def domemestream(self, ctx) -> None:
@ -552,7 +618,7 @@ class Meme(commands.Cog):
except:
await ctx.respond("Fuck! :(", ephemeral=True)
traceback.print_exc()
@commands.Cog.listener()
async def on_message(self, message: discord.Message) -> None:
"""
@ -560,50 +626,56 @@ class Meme(commands.Cog):
Also monitors for messages to #memes-top-10 to autodelete, only Havoc may post in #memes-top-10!
"""
lb_chanid: int = 1352373745108652145
if not self.bot.user: # No valid client instance
if not self.bot.user: # No valid client instance
return
if not isinstance(message.channel, discord.TextChannel):
return
if message.channel.id == lb_chanid\
and not message.author.id == self.bot.user.id:
if (
message.channel.id == lb_chanid
and not message.author.id == self.bot.user.id
):
"""Message to #memes-top-10 not by Havoc, delete it"""
await message.delete(reason=f"Messages to #{message.channel.name} are not allowed")
await message.delete(
reason=f"Messages to #{message.channel.name} are not allowed"
)
removal_embed: discord.Embed = discord.Embed(
title="Message Deleted",
description=f"Your message to **#{message.channel.name}** has been automatically deleted.\n**Reason**: Messages to this channel by users is not allowed."
description=f"Your message to **#{message.channel.name}** has been automatically deleted.\n**Reason**: Messages to this channel by users is not allowed.",
)
await message.author.send(embed=removal_embed)
if message.author.id == self.bot.user.id: # Bots own message
if message.author.id == self.bot.user.id: # Bots own message
return
if not message.guild:
return
if not message.channel.id == 1147229098544988261: # Not meme channel
if not message.channel.id == 1147229098544988261: # Not meme channel
return
if not message.attachments: # No attachments to consider a meme
return
if not message.attachments: # No attachments to consider a meme
return
await self.leaderboard_increment(message.author.id)
async def get_top(self, n: int = 10) -> Optional[list[tuple]]:
"""
Get top (n=10) Memes
Args:
n (int): Number of top results to return, default 10
Returns:
Optional[dict]
Optional[dict]
"""
try:
out_top: list[tuple[int, int]] = []
async with sqlite3.connect(self.stats_db_path, timeout=2) as db_conn:
db_conn.row_factory = sqlite3.Row
query: str = "SELECT discord_uid, count FROM memes WHERE count > 0 ORDER BY count DESC"
query: str = (
"SELECT discord_uid, count FROM memes WHERE count > 0 ORDER BY count DESC"
)
async with db_conn.execute(query) as db_cursor:
db_result = await db_cursor.fetchall()
for res in db_result:
uid = res['discord_uid']
count = res['count']
uid = res["discord_uid"]
count = res["count"]
out_top.append((uid, count))
# Check for and remove missing members
guild_id: int = 1145182936002482196
@ -615,19 +687,19 @@ class Meme(commands.Cog):
member: Optional[discord.Member] = guild.get_member(uid)
if not member:
out_top.pop(x)
return out_top[0:(n+1)]
return out_top[0 : (n + 1)]
except:
traceback.print_exc()
return None
async def get_top_embed(self, n:int = 10) -> Optional[discord.Embed]:
async def get_top_embed(self, n: int = 10) -> Optional[discord.Embed]:
"""
Get Top Memes Embed
Args:
n (int): Number of top results to return, default 10
Returns:
Optional[discord.Embed]
Optional[discord.Embed]
"""
guild_id: int = 1145182936002482196
guild: Optional[discord.Guild] = self.bot.get_guild(guild_id)
@ -643,13 +715,15 @@ class Meme(commands.Cog):
if not member:
continue
display_name: str = member.display_name
top_formatted += f"{x+1}. **{discord.utils.escape_markdown(display_name)}**: *{count}*\n"
top_formatted += (
f"{x+1}. **{discord.utils.escape_markdown(display_name)}**: *{count}*\n"
)
top_formatted = top_formatted.strip()
embed: discord.Embed = discord.Embed(title=f"Top {n} Memes",
description=top_formatted,
colour=0x25bd6b)
return embed
embed: discord.Embed = discord.Embed(
title=f"Top {n} Memes", description=top_formatted, colour=0x25BD6B
)
return embed
@tasks.loop(seconds=30, reconnect=True)
async def update_meme_lb(self) -> None:
"""Update the Meme Leaderboard"""
@ -661,27 +735,33 @@ class Meme(commands.Cog):
if not isinstance(channel, discord.TextChannel):
return
message_to_edit = await channel.fetch_message(message_id)
await message_to_edit.edit(embed=top_embed,
content="## This message will automatically update periodically.")
await message_to_edit.edit(
embed=top_embed,
content="## This message will automatically update periodically.",
)
except:
traceback.print_exc()
traceback.print_exc()
@bridge.bridge_command(hidden=True)
@commands.is_owner()
async def doembed(self, ctx) -> None:
"""Do Meme Embed"""
meme_lb_chan_id: int = 1352373745108652145
meme_lb_chan: Union[discord.TextChannel, Any] = self.bot.get_channel(meme_lb_chan_id)
meme_lb_chan: Union[discord.TextChannel, Any] = self.bot.get_channel(
meme_lb_chan_id
)
embed = await self.get_top_embed()
if embed:
await meme_lb_chan.send(embed=embed)
else:
await ctx.respond("NO embed :(")
def cog_unload(self) -> None:
self.meme_stream_loop.cancel()
self.explosm_loop.cancel()
self.update_meme_lb.cancel()
def setup(bot) -> None:
"""Run on Cog Load"""
bot.add_cog(Meme(bot))
bot.add_cog(Meme(bot))

File diff suppressed because it is too large Load Diff

View File

@ -9,110 +9,114 @@ from discord.ext import bridge, commands
from disc_havoc import Havoc
import util
class Owner(commands.Cog):
"""Owner Cog for Havoc"""
def __init__(self, bot: Havoc) -> None:
self.bot: Havoc = bot
self.former_roles_store: dict[int, list[discord.Role]] = {}
self._temperature: int = random.randrange(20, 30)
@bridge.bridge_command(guild_ids=[1145182936002482196])
async def temperature(self, ctx, temp: Optional[int|str] = None) -> None:
async def temperature(self, ctx, temp: Optional[int | str] = None) -> None:
"""
Set Temperature
"""
if not temp:
return await ctx.respond(f"The current temperature is: {self._temperature} °C")
return await ctx.respond(
f"The current temperature is: {self._temperature} °C"
)
if not self.bot.is_owner(ctx.author):
return await ctx.respond("I am afraid I can't let you do that.")
try:
_temperature: int = int(temp)
except:
return await ctx.respond("Invalid input")
return await ctx.respond("Invalid input")
if _temperature < -15:
return await ctx.respond("Too cold! (-15°C minimum)")
elif _temperature > 35:
return await ctx.respond("Too hot! (35°C maximum)")
self._temperature = _temperature
return await ctx.respond(f"As per your request, I have adjusted the temperature to {_temperature} °C.")
return await ctx.respond(
f"As per your request, I have adjusted the temperature to {_temperature} °C."
)
@bridge.bridge_command()
@commands.is_owner()
async def editmsg(self, ctx,
msgid: str,
*,
newcontent: str
) -> None:
async def editmsg(self, ctx, msgid: str, *, newcontent: str) -> None:
"""
Edit a message previously sent by the bot
"""
try:
message: Optional[discord.Message] = self.bot.get_message(int(msgid))
if not message:
await ctx.respond(f"**Failed:** Message {msgid} not found.",
ephemeral=True)
await ctx.respond(
f"**Failed:** Message {msgid} not found.", ephemeral=True
)
return None
await message.edit(content=newcontent)
await ctx.respond("**Done!**", ephemeral=True)
except Exception as e:
await ctx.respond(f"**Failed:** {str(e)}",
ephemeral=True)
await ctx.respond(f"**Failed:** {str(e)}", ephemeral=True)
@bridge.bridge_command()
@commands.is_owner()
async def reload(self, ctx) -> None:
"""
Reload Cogs
"""
self.bot.load_exts(False)
await ctx.respond("Reloaded!", ephemeral=True)
@bridge.bridge_command()
@commands.is_owner()
async def say(self, ctx, *,
parameters: str) -> None:
async def say(self, ctx, *, parameters: str) -> None:
"""
Make me say something in a channel
"""
_parameters: list[str] = parameters.split(" ")
if not len(_parameters) > 1:
return await ctx.respond("**Error**: Incorrect command usage; required: <chan> <msg>", ephemeral=True)
return await ctx.respond(
"**Error**: Incorrect command usage; required: <chan> <msg>",
ephemeral=True,
)
channel: str = _parameters[0]
channel_mentions: list[int] = discord.utils.raw_channel_mentions(channel)
if channel_mentions:
channel = str(channel_mentions[0])
msg: str = " ".join(_parameters[1:])
await util.discord_helpers.send_message(self.bot, channel=channel,
message=msg)
await util.discord_helpers.send_message(self.bot, channel=channel, message=msg)
return await ctx.respond("**Done.**", ephemeral=True)
@bridge.bridge_command()
@commands.is_owner()
async def chgstatus(self, ctx, *,
status: Optional[str] = None) -> None:
async def chgstatus(self, ctx, *, status: Optional[str] = None) -> None:
"""
Change bots status
"""
if not status:
return await ctx.respond("ERR: No status provided to change to!",
ephemeral=True)
await self.bot.change_presence(status=discord.Status.online,
activity=discord.CustomActivity(name=status.strip()))
return await ctx.respond(
"ERR: No status provided to change to!", ephemeral=True
)
await self.bot.change_presence(
status=discord.Status.online,
activity=discord.CustomActivity(name=status.strip()),
)
await ctx.respond("Done!", ephemeral=True)
@commands.message_command(name="Remove Messages Starting Here")
@commands.is_owner()
async def purge(self, ctx, message: discord.Message) -> None:
"""
Purge Messages
Args:
ctx (Any): Discord context
message (discord.Message): Discord message
@ -120,10 +124,12 @@ class Owner(commands.Cog):
None
"""
try:
await ctx.channel.purge(after=message,
bulk=True,
limit=900000,
reason=f"Purge initiated by {ctx.author.display_name}")
await ctx.channel.purge(
after=message,
bulk=True,
limit=900000,
reason=f"Purge initiated by {ctx.author.display_name}",
)
await message.delete(reason=f"Purge initiated by {ctx.author.display_name}")
await ctx.respond("**Done!**")
# Wait 3 seconds, then delete interaction
@ -133,13 +139,13 @@ class Owner(commands.Cog):
except Exception as e:
traceback.print_exc()
return await ctx.respond(f"**ERR: {str(e)}**")
@commands.message_command(name="Move to Memes")
@commands.is_owner()
async def movememe(self, ctx, message: discord.Message) -> None:
"""
Move to Memes
Args:
ctx (Any): Discord context
message (discord.Message): Discord message
@ -149,34 +155,39 @@ class Owner(commands.Cog):
try:
if not isinstance(message.channel, discord.TextChannel):
return
memes_channel: discord.TextChannel = ctx.guild.get_channel(1147229098544988261)
message_content: str = message.content
memes_channel: discord.TextChannel = ctx.guild.get_channel(
1147229098544988261
)
message_content: str = message.content
message_author: str = message.author.display_name
message_channel: str = message.channel.name
_file: Optional[discord.File] = None
if message.attachments:
for item in message.attachments:
if item.url and len(item.url) >= 20:
image: io.BytesIO = io.BytesIO(requests.get(item.url, stream=True,
timeout=20).raw.read())
ext: str = item.url.split(".")[-1]\
.split("?")[0].split("&")[0]
_file = discord.File(image, filename=f'img.{ext}')
image: io.BytesIO = io.BytesIO(
requests.get(item.url, stream=True, timeout=20).raw.read()
)
ext: str = item.url.split(".")[-1].split("?")[0].split("&")[0]
_file = discord.File(image, filename=f"img.{ext}")
if not _file:
return # No file to move
await memes_channel.send(f"*Performing bureaucratic duties (this didn't belong in #{message_channel})...*\n**{message_author}:** {message_content}", file=_file)
return # No file to move
await memes_channel.send(
f"*Performing bureaucratic duties (this didn't belong in #{message_channel})...*\n**{message_author}:** {message_content}",
file=_file,
)
await message.delete()
await ctx.respond("OK!", ephemeral=True)
except:
traceback.print_exc()
return await ctx.respond("Failed! :(", ephemeral=True)
@commands.message_command(name="Move to Drugs")
@commands.is_owner()
async def movedrugs(self, ctx, message: discord.Message) -> None:
"""
Move to Drugs
Args:
ctx (Any): Discord context
message (discord.Message): Discord message
@ -186,7 +197,9 @@ class Owner(commands.Cog):
try:
if not isinstance(message.channel, discord.TextChannel):
return
drugs_channel: discord.TextChannel = ctx.guild.get_channel(1172247451047034910)
drugs_channel: discord.TextChannel = ctx.guild.get_channel(
1172247451047034910
)
message_content: str = message.content
message_author: str = message.author.display_name
message_channel: str = message.channel.name
@ -194,27 +207,30 @@ class Owner(commands.Cog):
if message.attachments:
for item in message.attachments:
if item.url and len(item.url) >= 20:
image: io.BytesIO = io.BytesIO(requests.get(item.url, stream=True,
timeout=20).raw.read())
ext: str = item.url.split(".")[-1]\
.split("?")[0].split("&")[0]
_file = discord.File(image, filename=f'img.{ext}')
image: io.BytesIO = io.BytesIO(
requests.get(item.url, stream=True, timeout=20).raw.read()
)
ext: str = item.url.split(".")[-1].split("?")[0].split("&")[0]
_file = discord.File(image, filename=f"img.{ext}")
if not _file:
return # No file to move
await drugs_channel.send(f"*Performing bureaucratic duties (this didn't belong in #{message_channel})...\
*\n**{message_author}:** {message_content}", file=_file)
return # No file to move
await drugs_channel.send(
f"*Performing bureaucratic duties (this didn't belong in #{message_channel})...\
*\n**{message_author}:** {message_content}",
file=_file,
)
await message.delete()
await ctx.respond("OK!", ephemeral=True)
except:
traceback.print_exc()
return await ctx.respond("Failed! :(", ephemeral=True)
@commands.message_command(name="Move to fun-house")
@commands.is_owner()
async def movefunhouse(self, ctx, message: discord.Message) -> None:
"""
Move to fun-house
Args:
ctx (Any): Discord context
message (discord.Message): Discord message
@ -224,7 +240,9 @@ class Owner(commands.Cog):
try:
if not isinstance(message.channel, discord.TextChannel):
return
funhouse_channel: discord.TextChannel = ctx.guild.get_channel(1213160512364478607)
funhouse_channel: discord.TextChannel = ctx.guild.get_channel(
1213160512364478607
)
message_content: str = message.content
message_author: str = message.author.display_name
message_channel: str = message.channel.name
@ -232,25 +250,27 @@ class Owner(commands.Cog):
if message.attachments:
for item in message.attachments:
if item.url and len(item.url) >= 20:
image: io.BytesIO = io.BytesIO(requests.get(item.url, stream=True,
timeout=20).raw.read())
ext: str = item.url.split(".")[-1]\
.split("?")[0].split("&")[0]
_file = discord.File(image, filename=f'img.{ext}')
await funhouse_channel.send(f"*Performing bureaucratic duties (this didn't belong in #{message_channel})\
...*\n**{message_author}:** {message_content}")
image: io.BytesIO = io.BytesIO(
requests.get(item.url, stream=True, timeout=20).raw.read()
)
ext: str = item.url.split(".")[-1].split("?")[0].split("&")[0]
_file = discord.File(image, filename=f"img.{ext}")
await funhouse_channel.send(
f"*Performing bureaucratic duties (this didn't belong in #{message_channel})\
...*\n**{message_author}:** {message_content}"
)
await message.delete()
await ctx.respond("OK!", ephemeral=True)
except:
traceback.print_exc()
return await ctx.respond("Failed! :(", ephemeral=True)
@commands.user_command(name="Einsperren!", guild_ids=[145182936002482196])
@commands.is_owner()
async def einsperren(self, ctx, member: discord.Member) -> None:
"""
Einsperren!
Args:
ctx (Any): Discord context
member (discord.Member): Discord member
@ -259,16 +279,23 @@ class Owner(commands.Cog):
"""
try:
if not ctx.guild.id == 1145182936002482196:
return # Not home server!
return # Not home server!
if not member.roles:
return # No roles
return # No roles
audit_reason: str = f"Einsperren von {ctx.user.display_name}"
member = ctx.guild.get_member(member.id)
member_display: str = member.display_name
einsperren_role: discord.Role = ctx.guild.get_role(1235415059300093973) if ctx.guild.id != 1145182936002482196\
einsperren_role: discord.Role = (
ctx.guild.get_role(1235415059300093973)
if ctx.guild.id != 1145182936002482196
else ctx.guild.get_role(1235406301614309386)
member_roles: list = [role for role in member.roles if not role.name == "@everyone"]
member_role_names: list[str] = [str(role.name).lower() for role in member_roles]
)
member_roles: list = [
role for role in member.roles if not role.name == "@everyone"
]
member_role_names: list[str] = [
str(role.name).lower() for role in member_roles
]
opers_chan: discord.TextChannel = ctx.guild.get_channel(1181416083287187546)
if not "einsperren" in member_role_names:
try:
@ -276,28 +303,37 @@ class Owner(commands.Cog):
self.former_roles_store.pop(member.id)
self.former_roles_store[member.id] = member.roles
except:
pass # Safe to ignore
pass # Safe to ignore
try:
await member.edit(roles=[einsperren_role], reason=audit_reason)
await ctx.respond(f"Gesendet {member_display} an einsperren.", ephemeral=True)
await opers_chan.send(f"@everyone: {ctx.user.display_name} gesendet {member_display} an einsperren.")
await ctx.respond(
f"Gesendet {member_display} an einsperren.", ephemeral=True
)
await opers_chan.send(
f"@everyone: {ctx.user.display_name} gesendet {member_display} an einsperren."
)
except:
traceback.print_exc()
return await ctx.respond("GOTTVERDAMMT!!", ephemeral=True)
self.former_roles_store[member.id] = member.roles
if not member.id in self.former_roles_store:
await member.edit(roles=[]) # No roles
else:
former_roles: list = self.former_roles_store.get(member.id, [0])
await member.edit(roles=former_roles, reason=f"De-{audit_reason}")
await ctx.respond(f"{member_display} wurde von der Einsperre befreit.", ephemeral=True)
await opers_chan.send(f"{member_display} wurde von {ctx.user.display_name} aus der Einsperre befreit.")
await ctx.respond(
f"{member_display} wurde von der Einsperre befreit.", ephemeral=True
)
await opers_chan.send(
f"{member_display} wurde von {ctx.user.display_name} aus der Einsperre befreit."
)
except Exception as e:
traceback.print_exc()
return await ctx.respond(f"ERR: {str(e)}", ephemeral=True)
def setup(bot) -> None:
"""Run on Cog Load"""
bot.add_cog(Owner(bot))
bot.add_cog(Owner(bot))

View File

@ -6,38 +6,44 @@ from util.radio_util import get_now_playing, skip
import discord
from disc_havoc import Havoc
class Radio(commands.Cog):
"""Radio Cog for Havoc"""
def __init__(self, bot: Havoc) -> None:
self.bot: Havoc = bot
self.channels: dict[str, tuple] = {
'sfm': (1145182936002482196, 1221615558492029050), # Tuple: Guild Id, Chan Id
}
"sfm": (
1145182936002482196,
1221615558492029050,
), # Tuple: Guild Id, Chan Id
}
self.STREAM_URL: str = "https://stream.codey.lol/sfm.ogg"
self.LAST_NP_TRACK: Optional[str] = None
try:
self.radio_state_loop.cancel()
except Exception as e:
logging.debug("Failed to cancel radio_state_loop: %s",
str(e))
logging.debug("Failed to cancel radio_state_loop: %s", str(e))
@commands.Cog.listener()
async def on_ready(self) -> None:
"""Run on Bot Ready"""
await self.radio_init()
def is_radio_chan(): # type: ignore
def is_radio_chan(): # type: ignore
"""Check if channel is radio chan"""
def predicate(ctx):
try:
return ctx.channel.id == 1221615558492029050
except:
traceback.print_exc()
return False
return commands.check(predicate)
return commands.check(predicate)
@bridge.bridge_command()
@commands.is_owner()
@commands.is_owner()
async def reinitradio(self, ctx) -> None:
"""
Reinitialize serious.FM
@ -45,15 +51,15 @@ class Radio(commands.Cog):
loop: discord.asyncio.AbstractEventLoop = self.bot.loop
loop.create_task(self.radio_init())
await ctx.respond("Done!", ephemeral=True)
async def radio_init(self) -> None:
"""Init Radio"""
try:
(radio_guild, radio_chan) = self.channels['sfm']
(radio_guild, radio_chan) = self.channels["sfm"]
guild: Optional[discord.Guild] = self.bot.get_guild(radio_guild)
if not guild:
return
channel = guild.get_channel(radio_chan)
channel = guild.get_channel(radio_chan)
if not isinstance(channel, discord.VoiceChannel):
return
if not self.bot.voice_clients:
@ -62,8 +68,7 @@ class Radio(commands.Cog):
try:
self.radio_state_loop.cancel()
except Exception as e:
logging.debug("Failed to cancel radio_state_loop: %s",
str(e))
logging.debug("Failed to cancel radio_state_loop: %s", str(e))
self.radio_state_loop.start()
logging.info("radio_state_loop task started!")
except:
@ -72,12 +77,12 @@ class Radio(commands.Cog):
except:
traceback.print_exc()
return
@tasks.loop(seconds=5.0)
async def radio_state_loop(self) -> None:
"""Radio State Loop"""
try:
(radio_guild, radio_chan) = self.channels['sfm']
(radio_guild, radio_chan) = self.channels["sfm"]
try:
vc: discord.VoiceProtocol = self.bot.voice_clients[-1]
except:
@ -90,41 +95,47 @@ class Radio(commands.Cog):
return
await channel.connect()
vc = self.bot.voice_clients[-1]
if not vc.is_playing() or vc.is_paused(): # type: ignore
"""
Mypy does not seem aware of the is_playing, play, and is_paused methods,
but they exist.
"""
logging.info("Detected VC not playing... playing!")
source: discord.FFmpegAudio = discord.FFmpegOpusAudio(self.STREAM_URL,
before_options="-timeout 3000000")
vc.play(source, # type: ignore
after=lambda e: logging.info("Error: %s", e) if e\
else None)
# Get Now Playing
source: discord.FFmpegAudio = discord.FFmpegOpusAudio(
self.STREAM_URL, before_options="-timeout 3000000"
)
vc.play(
source, # type: ignore
after=lambda e: logging.info("Error: %s", e) if e else None,
)
# Get Now Playing
np_track: Optional[str] = await get_now_playing()
if np_track and not self.LAST_NP_TRACK == np_track:
self.LAST_NP_TRACK = np_track
await self.bot.change_presence(activity=discord.Activity(type=discord.ActivityType.listening, name=np_track))
await self.bot.change_presence(
activity=discord.Activity(
type=discord.ActivityType.listening, name=np_track
)
)
except:
traceback.print_exc()
@bridge.bridge_command()
@commands.is_owner()
async def skip(self, ctx) -> None:
"""
Skip - Convenience Command
"""
await skip()
return await ctx.respond("OK", ephemeral=True)
def cog_unload(self) -> None:
"""Run on Cog Unload"""
"""Run on Cog Unload"""
self.radio_state_loop.cancel()
def setup(bot) -> None:
"""Run on Cog Load"""
bot.add_cog(Radio(bot))
bot.add_cog(Radio(bot))

View File

@ -11,18 +11,23 @@ from disc_havoc import Havoc
BOT_CHANIDS = []
class Sing(commands.Cog):
"""Sing Cog for Havoc"""
def __init__(self, bot: Havoc) -> None:
self.bot: Havoc = bot
self.utility = Utility()
global BOT_CHANIDS
BOT_CHANIDS = self.bot.BOT_CHANIDS # Inherit
self.control_strip_regex: Pattern = regex.compile(r"\x0f|\x1f|\035|\002|\u2064|\x02|(\x03([0-9]{1,2}))|(\x03|\003)(?:\d{1,2}(?:,\d{1,2})?)?",
regex.UNICODE)
def is_spamchan(): # type: ignore
BOT_CHANIDS = self.bot.BOT_CHANIDS # Inherit
self.control_strip_regex: Pattern = regex.compile(
r"\x0f|\x1f|\035|\002|\u2064|\x02|(\x03([0-9]{1,2}))|(\x03|\003)(?:\d{1,2}(?:,\d{1,2})?)?",
regex.UNICODE,
)
def is_spamchan(): # type: ignore
"""Check if channel is spam chan"""
def predicate(ctx):
try:
if not ctx.channel.id in BOT_CHANIDS:
@ -31,18 +36,19 @@ class Sing(commands.Cog):
except:
traceback.print_exc()
return False
return commands.check(predicate)
@bridge.bridge_command(aliases=['sing'])
async def s(self, ctx, *,
song: Optional[str] = None) -> None:
return commands.check(predicate)
@bridge.bridge_command(aliases=["sing"])
async def s(self, ctx, *, song: Optional[str] = None) -> None:
"""
Search for lyrics, format is artist : song. Also reads activity.
"""
try:
with ctx.channel.typing():
interaction: bool = isinstance(ctx,
discord.ext.bridge.BridgeApplicationContext)
interaction: bool = isinstance(
ctx, discord.ext.bridge.BridgeApplicationContext
)
activity: Optional[discord.Activity] = None
if not song:
if not ctx.author.activities:
@ -51,68 +57,90 @@ class Sing(commands.Cog):
for _activity in ctx.author.activities:
if _activity.type == discord.ActivityType.listening:
activity = _activity
if not activity:
return await ctx.respond("**Error**: No song specified, no activity found to read.")
return await ctx.respond(
"**Error**: No song specified, no activity found to read."
)
if interaction:
await ctx.respond("*Searching...*") # Must respond to interactions within 3 seconds, per Discord
await ctx.respond(
"*Searching...*"
) # Must respond to interactions within 3 seconds, per Discord
parsed = self.utility.parse_song_input(song, activity)
if isinstance(parsed, tuple):
(search_artist, search_song, search_subsearch) = parsed
# await ctx.respond(f"So, {search_song} by {search_artist}? Subsearch: {search_subsearch} I will try...") # Commented, useful for debugging
search_result: Optional[list] = await self.utility.lyric_search(search_artist, search_song,
search_subsearch)
search_result: Optional[list] = await self.utility.lyric_search(
search_artist, search_song, search_subsearch
)
if not search_result:
await ctx.respond("ERR: No search result.")
return
if len(search_result) == 1:
# Error response from API
error, *_ = search_result[0]
return await ctx.respond(error)
if not isinstance(search_result[0], tuple):
return # Invalid data type
return # Invalid data type
(
search_result_artist, search_result_song, search_result_src,
search_result_confidence, search_result_time_taken
) = search_result[0] # First index is a tuple
search_result_wrapped: list[str] = search_result[1] # Second index is the wrapped lyrics
search_result_wrapped_short: list[str] = search_result[2] # Third is short wrapped lyrics
search_result_artist,
search_result_song,
search_result_src,
search_result_confidence,
search_result_time_taken,
) = search_result[
0
] # First index is a tuple
search_result_wrapped: list[str] = search_result[
1
] # Second index is the wrapped lyrics
search_result_wrapped_short: list[str] = search_result[
2
] # Third is short wrapped lyrics
if not ctx.channel.id in BOT_CHANIDS:
short_lyrics = " ".join(search_result_wrapped_short) # Replace with shortened lyrics for non spamchans
short_lyrics = regex.sub(r'\p{Vert_Space}', ' / ', short_lyrics.strip())
return await ctx.respond(f"**{search_result_song}** by **{search_result_artist}**\n-# {short_lyrics}")
short_lyrics = " ".join(
search_result_wrapped_short
) # Replace with shortened lyrics for non spamchans
short_lyrics = regex.sub(
r"\p{Vert_Space}", " / ", short_lyrics.strip()
)
return await ctx.respond(
f"**{search_result_song}** by **{search_result_artist}**\n-# {short_lyrics}"
)
c: int = 0
out_messages: list = []
footer: str = "" # Placeholder
footer: str = "" # Placeholder
for section in search_result_wrapped:
c+=1
c += 1
if c == len(search_result_wrapped):
footer = f"`Found on: {search_result_src}`"
# if ctx.guild.id == 1145182936002482196:
# section = section.upper()
section = regex.sub(r'\p{Vert_Space}', ' / ', section.strip())
msg: str = f"**{search_result_song}** by **{search_result_artist}**\n-# {section}\n{footer}"
section = regex.sub(r"\p{Vert_Space}", " / ", section.strip())
msg: str = (
f"**{search_result_song}** by **{search_result_artist}**\n-# {section}\n{footer}"
)
if c > 1:
msg = "\n".join(msg.split("\n")[1:])
out_messages.append(msg.strip())
for msg in out_messages:
await ctx.send(msg)
await ctx.send(msg)
except Exception as e:
traceback.print_exc()
await ctx.respond(f"ERR: {str(e)}")
@commands.user_command(name="Sing")
async def sing_context_menu(self, ctx, member: discord.Member) -> None:
"""
Sing Context Menu Command
Args:
ctx (Any): Discord context
member (discord.Member): Discord member
@ -122,67 +150,99 @@ class Sing(commands.Cog):
try:
PODY_ID: int = 1172340700663255091
IS_SPAMCHAN: bool = ctx.channel.id in BOT_CHANIDS
member_display = ctx.interaction.guild.get_member(member.id)\
.display_name
if not(ctx.interaction.guild.get_member(member.id).activities)\
and not member.id == PODY_ID:
return await ctx.respond(f"No activity detected to read for {member_display}.", ephemeral=True)
member_id: int = member.id #if not(member.id == PODY_ID) else 1234134345497837679 # Use Thomas for Pody!
member_display = ctx.interaction.guild.get_member(member.id).display_name
if (
not (ctx.interaction.guild.get_member(member.id).activities)
and not member.id == PODY_ID
):
return await ctx.respond(
f"No activity detected to read for {member_display}.",
ephemeral=True,
)
member_id: int = (
member.id
) # if not(member.id == PODY_ID) else 1234134345497837679 # Use Thomas for Pody!
activity: Optional[discord.Activity] = None
if IS_SPAMCHAN:
await ctx.respond(f"***Reading activity of {member_display}...***")
if IS_SPAMCHAN:
await ctx.respond(f"***Reading activity of {member_display}...***")
for _activity in ctx.interaction.guild.get_member(member_id).activities:
if _activity.type == discord.ActivityType.listening:
activity = _activity
parsed: Union[tuple, bool] = self.utility.parse_song_input(song=None,
activity=activity)
parsed: Union[tuple, bool] = self.utility.parse_song_input(
song=None, activity=activity
)
if not parsed:
return await ctx.respond(f"Could not parse activity of {member_display}.", ephemeral=True)
return await ctx.respond(
f"Could not parse activity of {member_display}.", ephemeral=True
)
if isinstance(parsed, tuple):
(search_artist, search_song, search_subsearch) = parsed
await ctx.respond("*Searching...*") # Must respond to interactions within 3 seconds, per Discord
search_result: Optional[list] = await self.utility.lyric_search(search_artist, search_song,
search_subsearch)
await ctx.respond(
"*Searching...*"
) # Must respond to interactions within 3 seconds, per Discord
search_result: Optional[list] = await self.utility.lyric_search(
search_artist, search_song, search_subsearch
)
if not search_result:
await ctx.respond("ERR: No search result")
return
if len(search_result) == 1 and\
isinstance(search_result[0][0], str):
return await ctx.send("ERR: No search result") # Error message from API
(search_result_artist, search_result_song, search_result_src,
search_result_confidence, search_result_time_taken) = search_result[0] # First index is a tuple
search_result_wrapped: list = search_result[1] # Second index is the wrapped lyrics
search_result_wrapped_short: list[str] = search_result[2] # Third index is shortened lyrics
if len(search_result) == 1 and isinstance(search_result[0][0], str):
return await ctx.send(
"ERR: No search result"
) # Error message from API
(
search_result_artist,
search_result_song,
search_result_src,
search_result_confidence,
search_result_time_taken,
) = search_result[
0
] # First index is a tuple
search_result_wrapped: list = search_result[
1
] # Second index is the wrapped lyrics
search_result_wrapped_short: list[str] = search_result[
2
] # Third index is shortened lyrics
if not IS_SPAMCHAN:
short_lyrics = " ".join(search_result_wrapped_short) # Replace with shortened lyrics for non spamchans
short_lyrics = regex.sub(r'\p{Vert_Space}', ' / ', short_lyrics.strip())
return await ctx.respond(f"**{search_result_song}** by **{search_result_artist}**\n-# {short_lyrics}")
short_lyrics = " ".join(
search_result_wrapped_short
) # Replace with shortened lyrics for non spamchans
short_lyrics = regex.sub(
r"\p{Vert_Space}", " / ", short_lyrics.strip()
)
return await ctx.respond(
f"**{search_result_song}** by **{search_result_artist}**\n-# {short_lyrics}"
)
out_messages: list = []
footer: str = ""
c: int = 0
for section in search_result_wrapped:
c+=1
c += 1
if c == len(search_result_wrapped):
footer = f"`Found on: {search_result_src}`"
# if ctx.guild.id == 1145182936002482196:
# section = section.upper()
section = regex.sub(r'\p{Vert_Space}', ' / ', section.strip())
msg: str = f"**{search_result_song}** by **{search_result_artist}**\n-# {section}\n{footer}"
section = regex.sub(r"\p{Vert_Space}", " / ", section.strip())
msg: str = (
f"**{search_result_song}** by **{search_result_artist}**\n-# {section}\n{footer}"
)
if c > 1:
msg = "\n".join(msg.split("\n")[1:])
msg = "\n".join(msg.split("\n")[1:])
out_messages.append(msg.strip())
for msg in out_messages:
await ctx.send(msg)
except Exception as e:
traceback.print_exc()
return await ctx.respond(f"ERR: {str(e)}")
def setup(bot) -> None:
"""Run on Cog Load"""
bot.add_cog(Sing(bot))

View File

@ -1,15 +1,22 @@
"""
AI
"""
class AIException(Exception):
"""AI Exception (generic)"""
pass
"""
LoveHate
"""
class LoveHateException(Exception):
"""Love Hate Exception (generic)"""
pass
@ -17,6 +24,8 @@ class LoveHateException(Exception):
Misc
"""
class MiscException(Exception):
"""Misc Exception (generic)"""
pass
pass

View File

@ -13,20 +13,20 @@ from termcolor import colored
from constants import OWNERS, BOT_CHANIDS
import api
logging.basicConfig(level=logging.INFO,
format='%(asctime)s %(message)s',
encoding='utf-8')
setproctitle.setproctitle('disc-havoc')
logging.basicConfig(
level=logging.INFO, format="%(asctime)s %(message)s", encoding="utf-8"
)
setproctitle.setproctitle("disc-havoc")
"""Auto Load Cogs"""
cogs_list: list[str] = [
'misc',
'owner',
'sing',
'meme',
'karma',
'lovehate',
'radio',
"misc",
"owner",
"sing",
"meme",
"karma",
"lovehate",
"radio",
]
bot_activity = discord.CustomActivity(name="I made cookies!")
@ -36,60 +36,68 @@ load_dotenv()
intents = discord.Intents.all()
intents.message_content = True
class Havoc(bridge.Bot):
def __init__(self) -> None:
super().__init__(command_prefix=".", intents=intents,
owner_ids=OWNERS, activity=bot_activity,
help_command=commands.MinimalHelpCommand())
super().__init__(
command_prefix=".",
intents=intents,
owner_ids=OWNERS,
activity=bot_activity,
help_command=commands.MinimalHelpCommand(),
)
self.BOT_CHANIDS = BOT_CHANIDS
self.load_exts()
def load_exts(self, initialRun: Optional[bool] = True) -> None:
"""
Load Cogs/Extensions
Args:
Args:
initialRun (Optional[bool]) default: True
Returns:
None
"""
load_method = self.load_extension if initialRun\
else self.reload_extension
load_method = self.load_extension if initialRun else self.reload_extension
for cog in cogs_list:
logging.info("Loading: %s", cog)
load_method(f'cogs.{cog}')
load_method(f"cogs.{cog}")
importlib.reload(api)
from api import API
from api import API
api_config = hypercorn.config.Config()
api_config.bind = ["127.0.0.1:5992"]
api_config.bind = ["127.0.0.1:5992"]
api_instance = api.API(self)
try:
try:
self.fapi_task.cancel()
except Exception as e:
logging.debug("Failed to cancel fapi_task: %s",
str(e))
logging.debug("Failed to cancel fapi_task: %s", str(e))
logging.info("Starting FAPI Task")
self.fapi_task: Task = self.loop.create_task(hypercorn.asyncio.serve(api_instance.api_app,
api_config))
self.fapi_task: Task = self.loop.create_task(
hypercorn.asyncio.serve(api_instance.api_app, api_config)
)
@commands.Cog.listener()
async def on_ready(self) -> None:
"""Run on Bot Ready"""
logging.info("%s online!", self.user)
logging.info("%s online!", self.user)
def __init__() -> None:
logging.info(colored(f"Log level: {logging.getLevelName(logging.root.level)}",
"red", attrs=['reverse']))
logging.info(
colored(
f"Log level: {logging.getLevelName(logging.root.level)}",
"red",
attrs=["reverse"],
)
)
bot = Havoc()
bot.run(os.getenv('TOKEN'))
bot.run(os.getenv("TOKEN"))
if __name__ == "__main__":
__init__()
__init__()

View File

@ -12,14 +12,20 @@ from util.catbox import CatboxAsync
logger = logging.getLogger()
logger.setLevel(logging.INFO)
async def test() -> None:
f = os.path.join(os.path.expanduser("~"), "qu.png")
box1: LitterboxAsync = LitterboxAsync()
box2: CatboxAsync = CatboxAsync()
url1: Optional[str] = await box1.upload(f)
url2: Optional[str] = await box2.upload(f)
logging.info("""Uploaded URLs:
logging.info(
"""Uploaded URLs:
Litter - %s\n
Cat - %s""", url1, url2)
asyncio.run(test())
Cat - %s""",
url1,
url2,
)
asyncio.run(test())

View File

@ -1,4 +1,4 @@
import importlib
from . import discord_helpers
importlib.reload(discord_helpers)
importlib.reload(discord_helpers)

View File

@ -12,35 +12,36 @@ Catbox Uploader (Async)
catbox_api_url: str = "https://catbox.moe/user/api.php"
http_headers: dict[str, str] = {
'accept': '*/*',
'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/101.0.4951.64 Safari/537.36 Edg/101.0.1210.53',
'Accept-Language': 'en-US,en;q=0.9,it;q=0.8,es;q=0.7',
'referer': 'https://www.google.com/',
'cookie': 'DSID=AAO-7r4OSkS76zbHUkiOpnI0kk-X19BLDFF53G8gbnd21VZV2iehu-w_2v14cxvRvrkd_NjIdBWX7wUiQ66f-D8kOkTKD1BhLVlqrFAaqDP3LodRK2I0NfrObmhV9HsedGE7-mQeJpwJifSxdchqf524IMh9piBflGqP0Lg0_xjGmLKEQ0F4Na6THgC06VhtUG5infEdqMQ9otlJENe3PmOQTC_UeTH5DnENYwWC8KXs-M4fWmDADmG414V0_X0TfjrYu01nDH2Dcf3TIOFbRDb993g8nOCswLMi92LwjoqhYnFdf1jzgK0'
}
"accept": "*/*",
"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/101.0.4951.64 Safari/537.36 Edg/101.0.1210.53",
"Accept-Language": "en-US,en;q=0.9,it;q=0.8,es;q=0.7",
"referer": "https://www.google.com/",
"cookie": "DSID=AAO-7r4OSkS76zbHUkiOpnI0kk-X19BLDFF53G8gbnd21VZV2iehu-w_2v14cxvRvrkd_NjIdBWX7wUiQ66f-D8kOkTKD1BhLVlqrFAaqDP3LodRK2I0NfrObmhV9HsedGE7-mQeJpwJifSxdchqf524IMh9piBflGqP0Lg0_xjGmLKEQ0F4Na6THgC06VhtUG5infEdqMQ9otlJENe3PmOQTC_UeTH5DnENYwWC8KXs-M4fWmDADmG414V0_X0TfjrYu01nDH2Dcf3TIOFbRDb993g8nOCswLMi92LwjoqhYnFdf1jzgK0",
}
class CatboxAsync:
def __init__(self) -> None:
self.catbox_api_url = catbox_api_url
self.headers = http_headers
def generateRandomFileName(self, fileExt: Optional[str] = None) -> str:
"""
Generate random file name
Args:
fileExt (Optional[str]): File extension to use for naming
Returns:
str
"""
if not fileExt:
fileExt = 'png'
fileExt = "png"
return f"{random.getrandbits(32)}.{fileExt}"
async def upload(self, file: str) -> Optional[str]:
"""
Upload file to catbox
Args:
file (str): Path of file to be uploaded
Returns:
@ -49,33 +50,33 @@ class CatboxAsync:
try:
if not file:
return None
if not(os.path.exists(file)):
logging.critical("Could not find %s",
file)
if not (os.path.exists(file)):
logging.critical("Could not find %s", file)
return None
fileExt: Optional[str] = None
if file.find(".") > 0:
fileExt = "".join(file.split(".")[-1:])
with open(file, 'rb') as fileContents:
with open(file, "rb") as fileContents:
post_data: FormData = FormData()
post_data.add_field(name="reqtype",
value="fileupload")
post_data.add_field(name="userhash",
value="")
post_data.add_field(name="reqtype", value="fileupload")
post_data.add_field(name="userhash", value="")
with magic.Magic(flags=magic.MAGIC_MIME) as m:
content_type = m.id_filename(file)
post_data.add_field(name="fileToUpload",
value=fileContents,
filename=self.generateRandomFileName(fileExt),
content_type=content_type
)
post_data.add_field(
name="fileToUpload",
value=fileContents,
filename=self.generateRandomFileName(fileExt),
content_type=content_type,
)
async with ClientSession() as session:
async with await session.post(self.catbox_api_url,
headers=self.headers,
data=post_data,
timeout=ClientTimeout(connect=10, sock_read=10)) as request:
async with await session.post(
self.catbox_api_url,
headers=self.headers,
data=post_data,
timeout=ClientTimeout(connect=10, sock_read=10),
) as request:
request.raise_for_status()
return await request.text()
except:
@ -85,4 +86,4 @@ class CatboxAsync:
try:
fileContents.close()
except:
return None
return None

View File

@ -6,11 +6,13 @@ from typing import Optional, Any
Discord Helper Methods
"""
async def get_channel_by_name(bot: discord.Bot, channel: str,
guild: int | None = None) -> Optional[Any]: # Optional[Any] used as pycord channel types can be ambigious
async def get_channel_by_name(
bot: discord.Bot, channel: str, guild: int | None = None
) -> Optional[Any]: # Optional[Any] used as pycord channel types can be ambigious
"""
Get Channel by Name
Args:
bot (discord.Bot)
channel (str)
@ -18,10 +20,9 @@ async def get_channel_by_name(bot: discord.Bot, channel: str,
Returns:
Optional[Any]
"""
channel = re.sub(r'^#', '', channel.strip())
channel = re.sub(r"^#", "", channel.strip())
if not guild:
return discord.utils.get(bot.get_all_channels(),
name=channel)
return discord.utils.get(bot.get_all_channels(), name=channel)
else:
_guild: Optional[discord.Guild] = bot.get_guild(guild)
if not _guild:
@ -32,12 +33,14 @@ async def get_channel_by_name(bot: discord.Bot, channel: str,
return _channel
return None
async def send_message(bot: discord.Bot, channel: str,
message: str, guild: int | None = None) -> None:
async def send_message(
bot: discord.Bot, channel: str, message: str, guild: int | None = None
) -> None:
"""
Send Message to the provided channel. If guild is provided, will limit to channels within that guild to ensure the correct
channel is selected. Useful in the event a channel exists in more than one guild that the bot resides in.
Args:
bot (discord.Bot)
channel (str)
@ -50,9 +53,8 @@ async def send_message(bot: discord.Bot, channel: str,
channel_int: int = int(channel)
_channel = bot.get_channel(channel_int)
else:
channel = re.sub(r'^#', '', channel.strip())
_channel = await get_channel_by_name(bot=bot,
channel=channel, guild=guild)
channel = re.sub(r"^#", "", channel.strip())
_channel = await get_channel_by_name(bot=bot, channel=channel, guild=guild)
if not isinstance(_channel, discord.TextChannel):
return None
await _channel.send(message)

View File

@ -1,9 +1,9 @@
import aiohttp
import aiohttp
from typing import Optional
import regex
import regex
from regex import Pattern
import os
import random
import os
import random
import logging
import traceback
from util.catbox import CatboxAsync
@ -13,61 +13,64 @@ Jesus Meme Generator
(requires Catbox uploader)
"""
class JesusMemeGenerator():
class JesusMemeGenerator:
def __init__(self) -> None:
self.MEMEAPIURL = "https://apimeme.com/meme?meme="
self.MEMESTORAGEDIR = os.path.join(os.path.expanduser("~"),
"memes")
self.MEMESTORAGEDIR = os.path.join(os.path.expanduser("~"), "memes")
self.top_line_regex: Pattern = regex.compile(
r'[^\p{Letter}\p{Number}\p{Punctuation}\p{Horiz_Space}\p{Currency_Symbol}]'
)
r"[^\p{Letter}\p{Number}\p{Punctuation}\p{Horiz_Space}\p{Currency_Symbol}]"
)
self.bottom_line_regex: Pattern = regex.compile(
r'[^\p{Letter}\p{Number}\p{Punctuation}\p{Horiz_Space}\p{Currency_Symbol}]'
)
self.url_regex_1: Pattern = regex.compile(
r'\p{Horiz_Space}')
self.url_regex_2: Pattern = regex.compile(
r'#')
async def create_meme(self, top_line: str, bottom_line: str,
meme="Jesus-Talking-To-Cool-Dude") -> Optional[str]:
r"[^\p{Letter}\p{Number}\p{Punctuation}\p{Horiz_Space}\p{Currency_Symbol}]"
)
self.url_regex_1: Pattern = regex.compile(r"\p{Horiz_Space}")
self.url_regex_2: Pattern = regex.compile(r"#")
async def create_meme(
self, top_line: str, bottom_line: str, meme="Jesus-Talking-To-Cool-Dude"
) -> Optional[str]:
"""
Create Meme
Args:
top_line (str): Top line of meme
bottom_line (str): Bottom line of meme
meme (str): The meme to use, defaults to Jesus-Talking-To-Cool-Dude
Returns:
Optional[str]
"""
try:
if not top_line or not bottom_line:
return None
top_line = self.top_line_regex.sub('',
top_line.strip())
bottom_line = self.bottom_line_regex.sub('',
bottom_line.strip())
top_line = self.top_line_regex.sub("", top_line.strip())
bottom_line = self.bottom_line_regex.sub("", bottom_line.strip())
out_fname: Optional[str] = None
if len(top_line) < 1 or len(bottom_line) < 1:
return None
formed_url: str = f"{self.MEMEAPIURL}{meme}&top={top_line.strip()}&bottom={bottom_line.strip()}"
formed_url = self.url_regex_1.sub('+', self.url_regex_2.sub('%23', formed_url.strip()))
formed_url: str = (
f"{self.MEMEAPIURL}{meme}&top={top_line.strip()}&bottom={bottom_line.strip()}"
)
formed_url = self.url_regex_1.sub(
"+", self.url_regex_2.sub("%23", formed_url.strip())
)
timeout = aiohttp.ClientTimeout(total=15)
async with aiohttp.ClientSession(timeout=timeout) as session:
async with session.get(formed_url) as response:
UUID = f"{random.getrandbits(8)}-{random.getrandbits(8)}"
out_fname = f"{UUID}.jpg"
with open(f"{self.MEMESTORAGEDIR}/{out_fname}", 'wb') as f:
with open(f"{self.MEMESTORAGEDIR}/{out_fname}", "wb") as f:
f.write(await response.read())
if not out_fname:
uploader = CatboxAsync()
meme_link: Optional[str] = await uploader.upload(f"{self.MEMESTORAGEDIR}/{out_fname}")
meme_link: Optional[str] = await uploader.upload(
f"{self.MEMESTORAGEDIR}/{out_fname}"
)
if not meme_link:
logging.info("Meme upload failed!")
return None

View File

@ -13,37 +13,38 @@ import os
litterbox_api_url: str = "https://litterbox.catbox.moe/resources/internals/api.php"
http_headers: dict[str, str] = {
'accept': '*/*',
'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/101.0.4951.64 Safari/537.36 Edg/101.0.1210.53',
'Accept-Language': 'en-US,en;q=0.9,it;q=0.8,es;q=0.7',
'referer': 'https://www.google.com/',
'cookie': 'DSID=AAO-7r4OSkS76zbHUkiOpnI0kk-X19BLDFF53G8gbnd21VZV2iehu-w_2v14cxvRvrkd_NjIdBWX7wUiQ66f-D8kOkTKD1BhLVlqrFAaqDP3LodRK2I0NfrObmhV9HsedGE7-mQeJpwJifSxdchqf524IMh9piBflGqP0Lg0_xjGmLKEQ0F4Na6THgC06VhtUG5infEdqMQ9otlJENe3PmOQTC_UeTH5DnENYwWC8KXs-M4fWmDADmG414V0_X0TfjrYu01nDH2Dcf3TIOFbRDb993g8nOCswLMi92LwjoqhYnFdf1jzgK0'
}
"accept": "*/*",
"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/101.0.4951.64 Safari/537.36 Edg/101.0.1210.53",
"Accept-Language": "en-US,en;q=0.9,it;q=0.8,es;q=0.7",
"referer": "https://www.google.com/",
"cookie": "DSID=AAO-7r4OSkS76zbHUkiOpnI0kk-X19BLDFF53G8gbnd21VZV2iehu-w_2v14cxvRvrkd_NjIdBWX7wUiQ66f-D8kOkTKD1BhLVlqrFAaqDP3LodRK2I0NfrObmhV9HsedGE7-mQeJpwJifSxdchqf524IMh9piBflGqP0Lg0_xjGmLKEQ0F4Na6THgC06VhtUG5infEdqMQ9otlJENe3PmOQTC_UeTH5DnENYwWC8KXs-M4fWmDADmG414V0_X0TfjrYu01nDH2Dcf3TIOFbRDb993g8nOCswLMi92LwjoqhYnFdf1jzgK0",
}
class LitterboxAsync:
def __init__(self) -> None:
self.headers: dict[str, str] = http_headers
self.api_path = litterbox_api_url
def generateRandomFileName(self, fileExt: Optional[str] = None) -> str:
"""
Generate Random Filename
Args:
fileExt (Optional[str]): File extension to use for naming
Returns:
str
"""
if not fileExt:
fileExt = 'png'
fileExt = "png"
return f"{random.getrandbits(32)}.{fileExt}"
async def upload(self,
file: Union[str, bytes, BufferedReader],
time='1h') -> Optional[str]:
async def upload(
self, file: Union[str, bytes, BufferedReader], time="1h"
) -> Optional[str]:
"""
Upload File to Litterbox
Args:
file (Union[str, bytes, BufferedReader]): File to upload (accepts either filepath or io.BufferedReader)
time (str): Expiration time, default: 1h
@ -59,38 +60,38 @@ class LitterboxAsync:
return None
if isinstance(file, BufferedReader):
file = file.read()
fileExt: str = 'png'
fileExt: str = "png"
if isinstance(file, str):
if file.find(".") > 0:
fileExt = "".join(file.split(".")[-1:])
file = open(file, 'rb').read()
file = open(file, "rb").read()
with magic.Magic(flags=magic.MAGIC_MIME) as m:
if isinstance(file, BufferedReader):
content_type = str(m.id_buffer(file))
else:
content_type = str(m.id_filename(file))
content_type = str(m.id_filename(file))
post_data: FormData = FormData()
post_data.add_field(name="reqtype",
value="fileupload")
post_data.add_field(name="userhash",
value="")
post_data.add_field(name="time",
value=time)
post_data.add_field(name="fileToUpload",
value=file,
filename=self.generateRandomFileName(fileExt),
content_type=content_type
post_data.add_field(name="reqtype", value="fileupload")
post_data.add_field(name="userhash", value="")
post_data.add_field(name="time", value=time)
post_data.add_field(
name="fileToUpload",
value=file,
filename=self.generateRandomFileName(fileExt),
content_type=content_type,
)
async with ClientSession() as session:
async with await session.post(self.api_path,
headers=self.headers,
data=post_data,
timeout=ClientTimeout(connect=5, sock_read=70)) as request:
async with await session.post(
self.api_path,
headers=self.headers,
data=post_data,
timeout=ClientTimeout(connect=5, sock_read=70),
) as request:
request.raise_for_status()
return await request.text()
except:

View File

@ -4,24 +4,28 @@ from typing import LiteralString, Optional, Union
import aiosqlite as sqlite3
from constructors import LoveHateException
class DB:
"""LoveHate DB Utility Class"""
def __init__(self, bot) -> None:
self.db_path: str|LiteralString = os.path.join("/usr/local/share",
"sqlite_dbs", "lovehate.db")
async def get_wholovehates(self, thing: str, loves: bool = False,
hates: bool = False) -> Union[list[tuple], bool]:
self.db_path: str | LiteralString = os.path.join(
"/usr/local/share", "sqlite_dbs", "lovehate.db"
)
async def get_wholovehates(
self, thing: str, loves: bool = False, hates: bool = False
) -> Union[list[tuple], bool]:
"""
Get a list of users who have professed their love OR hatred for <thing>
Args:
thing (str): The <thing> to check
loves (bool): Are we looking for loves?
hates (bool): ...or are we looking for hates?
Returns:
Union[list[tuple], bool]
"""
"""
query: str = "SELECT display_name FROM lovehate WHERE thing LIKE ? AND flag = ?"
params: tuple = tuple()
@ -35,8 +39,11 @@ class DB:
flag = 1
elif not hates and not loves:
raise LoveHateException("Neither loves nor hates were requested")
params = (thing, flag,)
params = (
thing,
flag,
)
async with sqlite3.connect(self.db_path, timeout=2) as db_conn:
async with await db_conn.execute(query, params) as db_cursor:
result: list[tuple] = await db_cursor.fetchall()
@ -44,28 +51,35 @@ class DB:
if not result:
return False
return result
async def get_lovehates(self, loves: bool = False, hates: bool = False,
user: Optional[str] = None, thing: Optional[str] = None) -> Union[list[tuple], bool]:
async def get_lovehates(
self,
loves: bool = False,
hates: bool = False,
user: Optional[str] = None,
thing: Optional[str] = None,
) -> Union[list[tuple], bool]:
"""
Get a list of either 1) what {user} loves/hates, or who loves/hates {thing}, depending on bools loves, hates
Args:
loves (bool): Are we looking for loves?
hates (bool): ...OR are we looking for hates?
user (Optional[str]): the user to query against
thing (Optional[str]): ... OR the thing to query against
Returns:
Union[list[tuple], bool]
"""
query: str = ""
params: tuple = tuple()
if not user and not thing:
raise LoveHateException("Neither a <user> or <thing> was specified to query against")
raise LoveHateException(
"Neither a <user> or <thing> was specified to query against"
)
flag: Optional[int] = None
if hates and loves:
@ -79,10 +93,16 @@ class DB:
if user:
query = "SELECT thing FROM lovehate WHERE display_name LIKE ? AND flag == ?"
params = (user, flag,)
params = (
user,
flag,
)
elif thing:
query = "SELECT display_name FROM lovehate WHERE thing LIKE ? AND flag == ?"
params = (thing, flag,)
params = (
thing,
flag,
)
async with sqlite3.connect(self.db_path, timeout=2) as db_conn:
async with await db_conn.execute(query, params) as db_cursor:
@ -91,22 +111,27 @@ class DB:
return False
return result
async def check_existence(self, user: str, thing: str) -> Optional[int]:
"""
Determine whether a user is opinionated on a <thing>
Args:
user (str): The user to check
thing (str): The thing to check if the user has an opinion on
Returns:
Optional[int]
"""
params = (user, thing,)
params = (
user,
thing,
)
async with sqlite3.connect(self.db_path, timeout=2) as db_conn:
async with await db_conn.execute("SELECT id, flag FROM lovehate WHERE display_name LIKE ? AND thing LIKE ?", params) as db_cursor:
async with await db_conn.execute(
"SELECT id, flag FROM lovehate WHERE display_name LIKE ? AND thing LIKE ?",
params,
) as db_cursor:
result = await db_cursor.fetchone()
if not result:
return None
@ -116,29 +141,38 @@ class DB:
async def update(self, user: str, thing: str, flag: int) -> str:
"""
Updates the lovehate database, and returns an appropriate response
Args:
user (str): The user to update
thing (str): The thing the user loves/hates/doesn't care about anymore
flag (int): int representation of love (1), hate (-1), and dontcare (0)
Returns:
Returns:
str
"""
if not flag in range(-1, 2):
raise LoveHateException(f"Invalid flag {flag} specified, is this love (1), hate (-1), or dontcare? (0)")
raise LoveHateException(
f"Invalid flag {flag} specified, is this love (1), hate (-1), or dontcare? (0)"
)
db_query: str = ""
params: tuple = (user, thing,)
params: tuple = (
user,
thing,
)
already_opinionated: Optional[int] = await self.check_existence(user, thing)
if already_opinionated:
if already_opinionated:
if flag == 0:
db_query = "DELETE FROM lovehate WHERE display_name LIKE ? AND thing LIKE ?"
db_query = (
"DELETE FROM lovehate WHERE display_name LIKE ? AND thing LIKE ?"
)
else:
loves_or_hates: str = "loves"
if already_opinionated == -1:
loves_or_hates = "hates"
raise LoveHateException(f"But {user} already {loves_or_hates} {thing}...")
raise LoveHateException(
f"But {user} already {loves_or_hates} {thing}..."
)
else:
match flag:
case -1:
@ -147,19 +181,22 @@ class DB:
db_query = "INSERT INTO lovehate(display_name, flag, thing) VALUES(?, 1, ?)"
case _:
raise LoveHateException("Unknown error, default case matched")
async with sqlite3.connect(self.db_path, timeout=2) as db_conn:
async with await db_conn.execute(db_query, params) as db_cursor:
await db_conn.commit()
if db_cursor.rowcount != 1:
raise LoveHateException(f"DB Error - RowCount: {db_cursor.rowcount} for INSERT query")
raise LoveHateException(
f"DB Error - RowCount: {db_cursor.rowcount} for INSERT query"
)
match flag:
case -1:
return f"We're done here, {user} hates {thing}."
case 0:
case 0:
return f"We're done here, {user} no longer cares one way or the other about {thing}."
case 1:
return f"We're done here, {user} loves {thing}."
case _:
raise LoveHateException("Unknown error, default case matched [2]")
raise LoveHateException(
"Unknown error, default case matched [2]"
)

View File

@ -11,144 +11,205 @@ from aiohttp import ClientSession, ClientTimeout
from bohancompliment import ComplimentGenerator
from discord import Embed
class Util:
"""Misc Utility"""
def __init__(self) -> None:
self.URL_URBANDICTIONARY: str = "http://api.urbandictionary.com/v0/define"
self.URL_INSULTAPI: str = "https://insult.mattbas.org/api/insult"
self.COMPLIMENT_GENERATOR = ComplimentGenerator()
self.dbs: dict[str, str|LiteralString] = {
'whisky': os.path.join("/usr/local/share",
"sqlite_dbs", "whiskey.db"),
'drinks': os.path.join("/usr/local/share",
"sqlite_dbs", "cocktails.db"),
'strains': os.path.join("/usr/local/share",
"sqlite_dbs", "strains.db"),
'qajoke': os.path.join("/usr/local/share",
"sqlite_dbs", "qajoke.db"),
'rjokes': os.path.join("/usr/local/share",
"sqlite_dbs", "rjokes.db"),
'randmsg': os.path.join("/usr/local/share",
"sqlite_dbs", "randmsg.db"),
'stats': os.path.join("/usr/local/share",
"sqlite_dbs", "havoc_stats.db"),
'cookies': os.path.join("/usr/local/share",
"sqlite_dbs", "cookies.db"),
self.dbs: dict[str, str | LiteralString] = {
"whisky": os.path.join("/usr/local/share", "sqlite_dbs", "whiskey.db"),
"drinks": os.path.join("/usr/local/share", "sqlite_dbs", "cocktails.db"),
"strains": os.path.join("/usr/local/share", "sqlite_dbs", "strains.db"),
"qajoke": os.path.join("/usr/local/share", "sqlite_dbs", "qajoke.db"),
"rjokes": os.path.join("/usr/local/share", "sqlite_dbs", "rjokes.db"),
"randmsg": os.path.join("/usr/local/share", "sqlite_dbs", "randmsg.db"),
"stats": os.path.join("/usr/local/share", "sqlite_dbs", "havoc_stats.db"),
"cookies": os.path.join("/usr/local/share", "sqlite_dbs", "cookies.db"),
}
self.COFFEES: list = ['a cup of french-pressed coffee', 'a cup of cold brew', 'a cup of flash brew',
'a cup of Turkish coffee', 'a cup of Moka', 'an espresso',
'a cup of Nescafe coffee',
'an iced coffee', 'a Frappé', 'a freddo cappuccino',
'a cup of Chock full o\'Nuts', 'a cup of Folgers', 'a cup of Lavazza',
'a cup of Maxwell House', 'a cup of Moccona', 'a cup of Mr. Brown Coffee',
'a cup of affogato al caffè',
'a cup of Caffè Medici', 'a cup of Café Touba',
'a double-double', 'an indian filter coffee', 'a cup of pocillo',
'a cup of caffè americano', 'a cup of caffè lungo', 'a latte', 'a manilo',
'a flat white', 'a cup of café cubano', 'a cup of caffè crema',
'a cup of cafe zorro', 'an espresso roberto', 'an espresso romano',
'an espresso sara', 'a guillermo', 'a ristretto', 'a cup of melya',
'a cup of caffè marocchino', 'a cup of café miel', 'a cup of café de olla',
'a Mazagran', 'a Palazzo', 'an ice shot', 'a macchiato',
'a cortado', 'a red eye', 'a cappuccino',
'a mocha', 'a café au lait', 'a bicerin',
'a caffè corretto', 'a ca phe trung', 'a café bombón', 'a Vienna coffee',
'a flat black', 'a lungo', 'a doppio', 'a ristretto bianco', 'a piccolo latte',
'a gibraltar', 'a breve', 'a café con leche', 'a su café', 'a café del tiempo',
'a java chip frappuccino', 'a pumpkin spice latte', 'a caramel macchiato',
'a white chocolate mocha', 'a hazelnut coffee', 'a toffee nut latte',
'a peppermint mocha', 'a cinnamon dolce latte', 'a coconut milk latte',
'an almond milk cappuccino', 'an oat milk latte', 'a caramel frappuccino',
'a chocolate frappuccino', 'a butter pecan coffee', 'a maple pecan latte',
'a sea salt caramel mocha', 'a nitro cold brew', 'a pumpkin cold brew',
'a honey almond flat white', 'a sweet cream cold brew', 'a matcha latte',
'a golden latte', 'a turmeric latte', 'a beetroot latte', 'a kopi luwak']
self.COFFEES: list = [
"a cup of french-pressed coffee",
"a cup of cold brew",
"a cup of flash brew",
"a cup of Turkish coffee",
"a cup of Moka",
"an espresso",
"a cup of Nescafe coffee",
"an iced coffee",
"a Frappé",
"a freddo cappuccino",
"a cup of Chock full o'Nuts",
"a cup of Folgers",
"a cup of Lavazza",
"a cup of Maxwell House",
"a cup of Moccona",
"a cup of Mr. Brown Coffee",
"a cup of affogato al caffè",
"a cup of Caffè Medici",
"a cup of Café Touba",
"a double-double",
"an indian filter coffee",
"a cup of pocillo",
"a cup of caffè americano",
"a cup of caffè lungo",
"a latte",
"a manilo",
"a flat white",
"a cup of café cubano",
"a cup of caffè crema",
"a cup of cafe zorro",
"an espresso roberto",
"an espresso romano",
"an espresso sara",
"a guillermo",
"a ristretto",
"a cup of melya",
"a cup of caffè marocchino",
"a cup of café miel",
"a cup of café de olla",
"a Mazagran",
"a Palazzo",
"an ice shot",
"a macchiato",
"a cortado",
"a red eye",
"a cappuccino",
"a mocha",
"a café au lait",
"a bicerin",
"a caffè corretto",
"a ca phe trung",
"a café bombón",
"a Vienna coffee",
"a flat black",
"a lungo",
"a doppio",
"a ristretto bianco",
"a piccolo latte",
"a gibraltar",
"a breve",
"a café con leche",
"a su café",
"a café del tiempo",
"a java chip frappuccino",
"a pumpkin spice latte",
"a caramel macchiato",
"a white chocolate mocha",
"a hazelnut coffee",
"a toffee nut latte",
"a peppermint mocha",
"a cinnamon dolce latte",
"a coconut milk latte",
"an almond milk cappuccino",
"an oat milk latte",
"a caramel frappuccino",
"a chocolate frappuccino",
"a butter pecan coffee",
"a maple pecan latte",
"a sea salt caramel mocha",
"a nitro cold brew",
"a pumpkin cold brew",
"a honey almond flat white",
"a sweet cream cold brew",
"a matcha latte",
"a golden latte",
"a turmeric latte",
"a beetroot latte",
"a kopi luwak",
]
self.LAST_5_COFFEES: list = []
def tdTuple(self, td:datetime.timedelta) -> tuple:
def tdTuple(self, td: datetime.timedelta) -> tuple:
"""
Create TimeDelta Tuple
Args:
td (datetime.timedelta)
Returns:
tuple
"""
def _t(t, n):
if t < n:
if t < n:
return (t, 0)
v = t//n
return (t - (v * n), v)
v = t // n
return (t - (v * n), v)
(s, h) = _t(td.seconds, 3600)
(s, m) = _t(s, 60)
(s, m) = _t(s, 60)
(mics, mils) = _t(td.microseconds, 1000)
return (td.days, h, m, s, mics, mils)
async def get_counter(self, counter: Optional[str] = None) -> Optional[dict]:
"""
Get Counter
Args:
counter (Optional[str])
Returns:
Optional[dict]
"""
stats_db: str|LiteralString = self.dbs.get('stats', '')
stats_db: str | LiteralString = self.dbs.get("stats", "")
if not stats_db:
return None
async with sqlite3.connect(stats_db,
timeout=3) as db_conn:
async with sqlite3.connect(stats_db, timeout=3) as db_conn:
db_conn.row_factory = sqlite3.Row
query: str = "SELECT ? FROM stats LIMIT 1"
if not counter:
query = "SELECT * FROM stats LIMIT 1"
async with await db_conn.execute(query, (counter,) if counter else None) as db_cursor:
async with await db_conn.execute(
query, (counter,) if counter else None
) as db_cursor:
result = await db_cursor.fetchone()
return result
async def get_stats_embed(self) -> Optional[Embed]:
"""
Get Stats Embed
Returns:
Optional[Embed]
"""
counters: Optional[dict] = await self.get_counter()
if not counters:
return None
embed: Embed = Embed(title="Stats")
counter_message: str = ""
counters_sorted: dict = dict(sorted(counters.items(),
key=lambda item: item[1], reverse=True))
counters_sorted: dict = dict(
sorted(counters.items(), key=lambda item: item[1], reverse=True)
)
for counter, value in counters_sorted.items():
counter = regex.sub(r'_', ' ',
counter.strip()).title()
counter = regex.sub(r"_", " ", counter.strip()).title()
counter_message += f"- {value} {counter}\n"
embed.description = counter_message.strip()
return embed
async def increment_counter(self, counter: str) -> bool:
"""
Increment Counter
Args:
counter (str)
Returns:
bool
"""
stats_db: str|LiteralString = self.dbs.get('stats', '')
stats_db: str | LiteralString = self.dbs.get("stats", "")
if not stats_db:
return False
async with sqlite3.connect(stats_db,
timeout=3) as db_conn:
async with await db_conn.execute(f"UPDATE stats SET {counter} = {counter} + 1") as db_cursor:
async with sqlite3.connect(stats_db, timeout=3) as db_conn:
async with await db_conn.execute(
f"UPDATE stats SET {counter} = {counter} + 1"
) as db_cursor:
if db_cursor.rowcount < 0:
logging.critical("[karma::increment_counter] Fail! %s", db_cursor.rowcount)
logging.critical(
"[karma::increment_counter] Fail! %s", db_cursor.rowcount
)
return False
await db_conn.commit()
return True
@ -156,32 +217,39 @@ class Util:
async def get_ud_def(self, term: str) -> tuple[str, str]:
"""
Get Definition from UD
Args:
term (str)
Returns:
tuple[str, str]
"""
try:
async with ClientSession() as session:
async with await session.get(self.URL_URBANDICTIONARY,
params={
"term": term,
},
headers = {
'content-type': 'application/json; charset=utf-8',
}, timeout=ClientTimeout(connect=5, sock_read=5)) as request:
logging.debug("UD returned: %s",
await request.text())
async with await session.get(
self.URL_URBANDICTIONARY,
params={
"term": term,
},
headers={
"content-type": "application/json; charset=utf-8",
},
timeout=ClientTimeout(connect=5, sock_read=5),
) as request:
logging.debug("UD returned: %s", await request.text())
data: dict = await request.json()
if "list" in data:
definitions: list[dict] = data["list"]
if definitions:
definition: dict = definitions[0]
definition_word: str = definition.get("word", "N/A")
definition_text: str = regex.sub(r'(\r|\n|\r\n)', ' ', definition["definition"].strip())
return (definition_word, definition_text) # Tuple: Returned word, returned definition
definition_text: str = regex.sub(
r"(\r|\n|\r\n)", " ", definition["definition"].strip()
)
return (
definition_word,
definition_text,
) # Tuple: Returned word, returned definition
else:
return (term, "Not found!")
else:
@ -193,24 +261,24 @@ class Util:
async def get_insult(self, recipient: str) -> str:
"""
Get Insult
Args:
recipient (str)
Returns:
str
"""
async with ClientSession() as session:
async with await session.get(f"{self.URL_INSULTAPI}?who={recipient}") as request:
async with await session.get(
f"{self.URL_INSULTAPI}?who={recipient}"
) as request:
request.raise_for_status()
return await request.text()
async def get_compliment(self, subject: str,
language: Optional[str] = 'en') -> str:
async def get_compliment(self, subject: str, language: Optional[str] = "en") -> str:
"""
Get Compliment
Args:
subject (str)
language (Optional[str]) (default: 'en')
@ -223,95 +291,117 @@ class Util:
async def get_whisky(self) -> Optional[tuple]:
"""
Get Whisky
Returns:
Optional[tuple]
"""
whisky_db: str|LiteralString = self.dbs.get('whisky', '')
whisky_db: str | LiteralString = self.dbs.get("whisky", "")
if not whisky_db:
return None
async with sqlite3.connect(database=whisky_db,
timeout=2) as db_conn:
db_query: str = "SELECT name, category, description FROM whiskeys ORDER BY random() LIMIT 1"
async with sqlite3.connect(database=whisky_db, timeout=2) as db_conn:
db_query: str = (
"SELECT name, category, description FROM whiskeys ORDER BY random() LIMIT 1"
)
async with await db_conn.execute(db_query) as db_cursor:
db_result: Optional[Union[sqlite3.Row, tuple]] = await db_cursor.fetchone()
db_result: Optional[Union[sqlite3.Row, tuple]] = (
await db_cursor.fetchone()
)
if not db_result:
return None
(name, category, description) = db_result
name = regex.sub(r'(^\p{White_Space}|\r|\n)', '',
regex.sub(r'\p{White_Space}{2,}', ' ',
name.strip()))
category = regex.sub(r'(^\p{White_Space}|\r|\n)', '',
regex.sub(r'\p{White_Space}{2,}', ' ',
category.strip()))
description = regex.sub(r'(^\p{White_Space}|\r|\n)', '',
regex.sub(r'\p{White_Space}{2,}', ' ',
description.strip()))
name = regex.sub(
r"(^\p{White_Space}|\r|\n)",
"",
regex.sub(r"\p{White_Space}{2,}", " ", name.strip()),
)
category = regex.sub(
r"(^\p{White_Space}|\r|\n)",
"",
regex.sub(r"\p{White_Space}{2,}", " ", category.strip()),
)
description = regex.sub(
r"(^\p{White_Space}|\r|\n)",
"",
regex.sub(r"\p{White_Space}{2,}", " ", description.strip()),
)
return (name, category, description)
async def get_drink(self) -> Optional[tuple]:
"""
Get Drink
Returns:
Optional[tuple]
"""
drinks_db: str|LiteralString = self.dbs.get('drinks', '')
drinks_db: str | LiteralString = self.dbs.get("drinks", "")
if not drinks_db:
return None
async with sqlite3.connect(database=drinks_db,
timeout=2) as db_conn:
db_query: str = "SELECT name, ingredients FROM cocktails ORDER BY random() LIMIT 1"
async with sqlite3.connect(database=drinks_db, timeout=2) as db_conn:
db_query: str = (
"SELECT name, ingredients FROM cocktails ORDER BY random() LIMIT 1"
)
async with await db_conn.execute(db_query) as db_cursor:
db_result: tuple = await db_cursor.fetchone()
(name, ingredients) = db_result
name = regex.sub(r'(^\p{White_Space}|\r|\n)', '', regex.sub(r'\p{White_Space}{2,}', ' ', name.strip()))
ingredients = regex.sub(r'(^\p{White_Space}|\r|\n)', '', regex.sub(r'\p{White_Space}{2,}', ' ', ingredients.strip()))
ingredients = regex.sub(r'\*', '\u2731', ingredients.strip())
name = regex.sub(
r"(^\p{White_Space}|\r|\n)",
"",
regex.sub(r"\p{White_Space}{2,}", " ", name.strip()),
)
ingredients = regex.sub(
r"(^\p{White_Space}|\r|\n)",
"",
regex.sub(r"\p{White_Space}{2,}", " ", ingredients.strip()),
)
ingredients = regex.sub(r"\*", "\u2731", ingredients.strip())
return (name, ingredients)
async def get_strain(self, strain: Optional[str] = None) -> Optional[tuple]:
"""
Get Strain
Args:
strain (Optional[str])
Returns:
Optional[tuple]
"""
strains_db: str|LiteralString = self.dbs.get('strains', '')
strains_db: str | LiteralString = self.dbs.get("strains", "")
if not strains_db:
return None
async with sqlite3.connect(database=strains_db,
timeout=2) as db_conn:
async with sqlite3.connect(database=strains_db, timeout=2) as db_conn:
db_params: Optional[tuple] = None
if not strain:
db_query: str = "SELECT name, description FROM strains_w_desc ORDER BY random() LIMIT 1"
db_query: str = (
"SELECT name, description FROM strains_w_desc ORDER BY random() LIMIT 1"
)
else:
db_query = "SELECT name, description FROM strains_w_desc WHERE name LIKE ?"
db_params = (f"%{strain.strip()}%",)
db_query = (
"SELECT name, description FROM strains_w_desc WHERE name LIKE ?"
)
db_params = (f"%{strain.strip()}%",)
async with await db_conn.execute(db_query, db_params) as db_cursor:
db_result: Optional[tuple] = await db_cursor.fetchone()
return db_result
async def get_qajoke(self) -> Optional[tuple]:
"""
Get QA Joke
Returns:
Optional[tuple]
"""
qajoke_db: str|LiteralString = self.dbs.get('qajoke', '')
qajoke_db: str | LiteralString = self.dbs.get("qajoke", "")
if not qajoke_db:
return None
async with sqlite3.connect(database=qajoke_db,
timeout=2) as db_conn:
db_query: str = "SELECT question, answer FROM jokes ORDER BY RANDOM() LIMIT 1"
async with sqlite3.connect(database=qajoke_db, timeout=2) as db_conn:
db_query: str = (
"SELECT question, answer FROM jokes ORDER BY RANDOM() LIMIT 1"
)
async with await db_conn.execute(db_query) as cursor:
(question, answer) = await cursor.fetchone()
return (question, answer)
@ -320,107 +410,110 @@ class Util:
async def get_rjoke(self) -> Optional[tuple]:
"""
Get r/joke Joke
Returns:
Optional[tuple]
"""
rjokes_db: str|LiteralString = self.dbs.get('rjokes', '')
rjokes_db: str | LiteralString = self.dbs.get("rjokes", "")
if not rjokes_db:
return None
async with sqlite3.connect(database=rjokes_db, timeout=2) as db_conn:
db_query: str = "SELECT title, body, score FROM jokes WHERE score >= 100 ORDER BY RANDOM() LIMIT 1'"
db_query: str = (
"SELECT title, body, score FROM jokes WHERE score >= 100 ORDER BY RANDOM() LIMIT 1'"
)
async with await db_conn.execute(db_query) as cursor:
(title, body, score) = await cursor.fetchone()
return (title, body, score)
return None
return None
async def get_random_fact(self) -> str:
"""
Get Random Fact
Returns:
str
"""
try:
facts_api_url: str = "https://uselessfacts.jsph.pl/api/v2/facts/random"
facts_backup_url: str = "https://cnichols1734.pythonanywhere.com/facts/random"
facts_backup_url: str = (
"https://cnichols1734.pythonanywhere.com/facts/random"
)
async with ClientSession() as client:
try:
async with await client.get(facts_api_url,
timeout=ClientTimeout(connect=5, sock_read=5)) as request:
async with await client.get(
facts_api_url, timeout=ClientTimeout(connect=5, sock_read=5)
) as request:
_json: dict = await request.json()
fact: str = _json.get('text', None)
fact: str = _json.get("text", None)
if not fact:
raise BaseException("RandFact Src 1 Failed")
return fact
except:
async with await client.get(facts_backup_url,
timeout=ClientTimeout(connect=5, sock_read=5)) as request:
async with await client.get(
facts_backup_url, timeout=ClientTimeout(connect=5, sock_read=5)
) as request:
_json = await request.json()
fact = _json.get('fact', None)
fact = _json.get("fact", None)
return fact
except Exception as e:
traceback.print_exc()
return f"Failed to get a random fact :( [{str(e)}]"
async def get_cookie(self) -> Optional[dict]:
"""
Get Cookie
Returns:
Optional[dict]
"""
cookies_db = self.dbs.get('cookies', '')
cookies_db = self.dbs.get("cookies", "")
if not cookies_db:
return None
async with sqlite3.connect(cookies_db,
timeout=2) as db_conn:
db_query: str = "SELECT name, origin, image_url FROM cookies ORDER BY RANDOM() LIMIT 1"
async with sqlite3.connect(cookies_db, timeout=2) as db_conn:
db_query: str = (
"SELECT name, origin, image_url FROM cookies ORDER BY RANDOM() LIMIT 1"
)
async with await db_conn.execute(db_query) as db_cursor:
(name, origin, image_url) = await db_cursor.fetchone()
return {
'name': name,
'origin': origin,
'image_url': image_url
}
return {"name": name, "origin": origin, "image_url": image_url}
def get_coffee(self,
recipient_allergic: Optional[bool] = False) -> Optional[str]:
def get_coffee(self, recipient_allergic: Optional[bool] = False) -> Optional[str]:
"""
Get Coffee
Args:
recipient_allergic (bool): Is the recipient allergic? (so we know when to keep our nuts out of it)
Returns:
str
"""
try:
randomCoffee: str = random.choice(self.COFFEES)
if self.LAST_5_COFFEES and randomCoffee in self.LAST_5_COFFEES\
or (recipient_allergic and "nut" in randomCoffee.lower()):
return self.get_coffee() # Recurse
if (
self.LAST_5_COFFEES
and randomCoffee in self.LAST_5_COFFEES
or (recipient_allergic and "nut" in randomCoffee.lower())
):
return self.get_coffee() # Recurse
if len(self.LAST_5_COFFEES) >= 5:
self.LAST_5_COFFEES.pop() # Store no more than 5 of the last served coffees
self.LAST_5_COFFEES.pop() # Store no more than 5 of the last served coffees
self.LAST_5_COFFEES.append(randomCoffee)
return randomCoffee
except:
traceback.print_exc()
return None
def get_days_to_xmas(self) -> Optional[tuple]:
"""
Get # of Days until Xmas
Returns:
Optional[tuple]
"""
today: datetime.datetime = datetime.datetime.now(tz=pytz.UTC)
xmas: datetime.datetime = datetime.datetime(
@ -429,25 +522,24 @@ class Util:
day=25,
tzinfo=pytz.UTC,
)
td: datetime.timedelta = (xmas - today)
td: datetime.timedelta = xmas - today
days, hours, minutes, seconds, us, ms = self.tdTuple(td)
return (days, hours, minutes, seconds, ms, us)
async def get_randmsg(self) -> Optional[str]:
"""
Get Random Message from randmsg.db
Returns:
Optional[str]
"""
randmsg_db: str|LiteralString = self.dbs.get('randmsg', '')
randmsg_db: str | LiteralString = self.dbs.get("randmsg", "")
if not randmsg_db:
return None
async with sqlite3.connect(database=randmsg_db,
timeout=2) as db_conn:
async with sqlite3.connect(database=randmsg_db, timeout=2) as db_conn:
db_query: str = "SELECT msg FROM msgs ORDER BY RANDOM() LIMIT 1"
async with await db_conn.execute(db_query) as db_cursor:
(result,) = await db_cursor.fetchone()
return result
return result

View File

@ -4,6 +4,7 @@ from aiohttp import ClientSession, ClientTimeout
"""Radio Utils"""
async def get_now_playing() -> Optional[str]:
"""
Get radio now playing
@ -15,17 +16,21 @@ async def get_now_playing() -> Optional[str]:
np_url: str = "https://api.codey.lol/radio/np"
try:
async with ClientSession() as session:
async with await session.post(np_url, headers={
'content-type': 'application/json; charset=utf-8',
}, timeout=ClientTimeout(connect=1.5, sock_read=1.5)) as request:
async with await session.post(
np_url,
headers={
"content-type": "application/json; charset=utf-8",
},
timeout=ClientTimeout(connect=1.5, sock_read=1.5),
) as request:
request.raise_for_status()
response_json = await request.json()
artistsong = response_json.get('artistsong')
artistsong = response_json.get("artistsong")
return artistsong
except Exception as e:
logging.critical("Now playing retrieval failed: %s",
str(e))
return None
logging.critical("Now playing retrieval failed: %s", str(e))
return None
async def skip() -> bool:
"""
@ -38,13 +43,13 @@ async def skip() -> bool:
try:
ls_uri: str = "http://127.0.0.1:29000"
async with ClientSession() as session:
async with session.get(f"{ls_uri}/next",
timeout=ClientTimeout(connect=2, sock_read=2)) as request:
request.raise_for_status()
text: Optional[str] = await request.text()
return text == "OK"
async with session.get(
f"{ls_uri}/next", timeout=ClientTimeout(connect=2, sock_read=2)
) as request:
request.raise_for_status()
text: Optional[str] = await request.text()
return text == "OK"
except Exception as e:
logging.debug("Skip failed: %s", str(e))
return False # failsafe
logging.debug("Skip failed: %s", str(e))
return False # failsafe

View File

@ -6,21 +6,24 @@ import traceback
from discord import Activity
from typing import Optional, Union
class Utility:
"""Sing Utility"""
def __init__(self) -> None:
self.api_url: str = "http://127.0.0.1:52111/lyric/search"
self.api_src: str = "DISC-HAVOC"
def parse_song_input(self, song: Optional[str] = None,
activity: Optional[Activity] = None) -> Union[bool, tuple]:
def parse_song_input(
self, song: Optional[str] = None, activity: Optional[Activity] = None
) -> Union[bool, tuple]:
"""
Parse Song (Sing Command) Input
Args:
song (Optional[str]): Song to search
activity (Optional[discord.Activity]): Discord activity, used to attempt lookup if no song is provided
Returns:
Union[bool, tuple]
"""
@ -29,13 +32,18 @@ class Utility:
return False
if not song and activity:
if not activity.name:
return False # No valid activity found
return False # No valid activity found
match activity.name.lower():
case "codey toons" | "cider" | "sonixd":
search_artist: str = " ".join(str(activity.state)\
.strip().split(" ")[1:])
search_artist = regex.sub(r"(\s{0,})(\[(spotify|tidal|sonixd|browser|yt music)])$", "",
search_artist.strip(), flags=regex.IGNORECASE)
search_artist: str = " ".join(
str(activity.state).strip().split(" ")[1:]
)
search_artist = regex.sub(
r"(\s{0,})(\[(spotify|tidal|sonixd|browser|yt music)])$",
"",
search_artist.strip(),
flags=regex.IGNORECASE,
)
search_song = str(activity.details)
song = f"{search_artist} : {search_song}"
case "tidal hi-fi":
@ -43,110 +51,136 @@ class Utility:
search_song = str(activity.details)
song = f"{search_artist} : {search_song}"
case "spotify":
if not activity.title or not activity.artist: # type: ignore
if not activity.title or not activity.artist: # type: ignore
"""
Attributes exist, but mypy does not recognize them. Ignored.
"""
return False
search_artist = str(activity.title) # type: ignore
search_song = str(activity.artist) # type: ignore
search_artist = str(activity.title) # type: ignore
search_song = str(activity.artist) # type: ignore
song = f"{search_artist} : {search_song}"
case "serious.fm" | "cocks.fm" | "something":
if not activity.details:
song = str(activity.state)
else:
search_artist = str(activity.state).rsplit("[", maxsplit=1)[0] # Strip genre
search_artist = str(activity.state).rsplit("[", maxsplit=1)[
0
] # Strip genre
search_song = str(activity.details)
song = f"{search_artist} : {search_song}"
case _:
return False # Unsupported activity detected
search_split_by: str = ":" if not(song) or len(song.split(":")) > 1\
else "-" # Support either : or - to separate artist/track
return False # Unsupported activity detected
search_split_by: str = (
":" if not (song) or len(song.split(":")) > 1 else "-"
) # Support either : or - to separate artist/track
if not song:
return False
search_artist = song.split(search_split_by)[0].strip()
search_song = "".join(song.split(search_split_by)[1:]).strip()
search_subsearch: Optional[str] = None
if search_split_by == ":" and len(song.split(":")) > 2: # Support sub-search if : is used (per instructions)
search_song = song.split(search_split_by)[1].strip() # Reduce search_song to only the 2nd split of : [the rest is meant to be lyric text]
search_subsearch = "".join(song.split(search_split_by)[2:]) # Lyric text from split index 2 and beyond
if (
search_split_by == ":" and len(song.split(":")) > 2
): # Support sub-search if : is used (per instructions)
search_song = song.split(search_split_by)[
1
].strip() # Reduce search_song to only the 2nd split of : [the rest is meant to be lyric text]
search_subsearch = "".join(
song.split(search_split_by)[2:]
) # Lyric text from split index 2 and beyond
return (search_artist, search_song, search_subsearch)
except:
traceback.print_exc()
return False
async def lyric_search(self, artist: str, song: str,
sub: Optional[str] = None) -> Optional[list]:
async def lyric_search(
self, artist: str, song: str, sub: Optional[str] = None
) -> Optional[list]:
"""
Lyric Search
Args:
artist (str): Artist to search
song (str): Song to search
sub (Optional[str]): Lyrics for subsearch
sub (Optional[str]): Lyrics for subsearch
Returns:
Optional[list]
"""
try:
if not artist or not song:
return [("FAIL! Artist/Song not provided",)]
search_obj: dict = {
'a': artist.strip(),
's': song.strip(),
'extra': True,
'src': self.api_src,
"a": artist.strip(),
"s": song.strip(),
"extra": True,
"src": self.api_src,
}
if len(song.strip()) < 1:
search_obj.pop('a')
search_obj.pop('s')
search_obj['t'] = artist.strip() # Parse failed, try title without sep
search_obj.pop("a")
search_obj.pop("s")
search_obj["t"] = artist.strip() # Parse failed, try title without sep
if sub and len(sub) >= 2:
search_obj['sub'] = sub.strip()
search_obj["sub"] = sub.strip()
async with aiohttp.ClientSession() as session:
async with await session.post(self.api_url,
json=search_obj,
timeout=aiohttp.ClientTimeout(connect=5, sock_read=10)) as request:
async with await session.post(
self.api_url,
json=search_obj,
timeout=aiohttp.ClientTimeout(connect=5, sock_read=10),
) as request:
request.raise_for_status()
response: dict = await request.json()
if response.get('err'):
if response.get("err"):
return [(f"ERR: {response.get('errorText')}",)]
out_lyrics = regex.sub(r'<br>', '\u200B\n', response.get('lyrics', ''))
out_lyrics = regex.sub(
r"<br>", "\u200b\n", response.get("lyrics", "")
)
response_obj: dict = {
'artist': response.get('artist'),
'song': response.get('song'),
'lyrics': out_lyrics,
'src': response.get('src'),
'confidence': float(response.get('confidence', 0.0)),
'time': float(response.get('time', -1.0)),
"artist": response.get("artist"),
"song": response.get("song"),
"lyrics": out_lyrics,
"src": response.get("src"),
"confidence": float(response.get("confidence", 0.0)),
"time": float(response.get("time", -1.0)),
}
lyrics = response_obj.get('lyrics')
lyrics = response_obj.get("lyrics")
if not lyrics:
return None
response_obj['lyrics'] = textwrap.wrap(text=lyrics.strip(),
width=1500, drop_whitespace=False,
replace_whitespace=False, break_long_words=True,
break_on_hyphens=True, max_lines=8)
response_obj['lyrics_short'] = textwrap.wrap(text=lyrics.strip(),
width=750, drop_whitespace=False,
replace_whitespace=False, break_long_words=True,
break_on_hyphens=True, max_lines=1)
response_obj["lyrics"] = textwrap.wrap(
text=lyrics.strip(),
width=1500,
drop_whitespace=False,
replace_whitespace=False,
break_long_words=True,
break_on_hyphens=True,
max_lines=8,
)
response_obj["lyrics_short"] = textwrap.wrap(
text=lyrics.strip(),
width=750,
drop_whitespace=False,
replace_whitespace=False,
break_long_words=True,
break_on_hyphens=True,
max_lines=1,
)
return [
(
response_obj.get('artist'), response_obj.get('song'), response_obj.get('src'),
response_obj.get("artist"),
response_obj.get("song"),
response_obj.get("src"),
f"{int(response_obj.get('confidence', -1.0))}%",
f"{response_obj.get('time', -666.0):.4f}s",
),
response_obj.get('lyrics'),
response_obj.get('lyrics_short'),
]
response_obj.get("lyrics"),
response_obj.get("lyrics_short"),
]
except Exception as e:
traceback.print_exc()
return [f"Retrieval failed: {str(e)}"]
return [f"Retrieval failed: {str(e)}"]