This commit is contained in:
codey 2025-05-20 11:13:49 -04:00
parent b13c2eec2a
commit b3f0e084ce
6 changed files with 58 additions and 403 deletions

View File

@ -1,364 +0,0 @@
import sys
from os import path
sys.path.append(path.dirname(path.dirname(path.abspath(__file__))))
import constants
import traceback
import time
import importlib
import logging
from typing import Optional
import discord
import regex
from regex import Pattern
from aiohttp import ClientSession, ClientTimeout
from discord.ext import bridge, commands, tasks
from disc_havoc import Havoc
class Util:
"""Karma Utility"""
def __init__(self, bot: Havoc):
self.bot: Havoc = bot
self.api_key: str = constants.PRV_API_KEY
self.karma_endpoints_base_url: str = "https://api.codey.lol/karma/"
self.karma_retrieval_url: str = f"{self.karma_endpoints_base_url}get"
self.karma_update_url: str = f"{self.karma_endpoints_base_url}modify"
self.karma_top_10_url: str = f"{self.karma_endpoints_base_url}top"
self.timers: dict = {} # discord uid : timestamp, used for rate limiting
self.karma_cooldown: int = 15 # 15 seconds between karma updates
async def get_karma(self, keyword: str) -> int:
"""
Get Karma for Keyword
Args:
keyword (str)
Returns:
int
"""
try:
async with ClientSession() as session:
async with await session.post(
self.karma_retrieval_url,
json={"keyword": keyword},
headers={
"content-type": "application/json; charset=utf-8",
"X-Authd-With": f"Bearer {constants.KARMA_API_KEY}",
},
timeout=ClientTimeout(connect=3, sock_read=5),
) as request:
resp = await request.json()
return resp.get("count")
except Exception as e:
logging.debug("Exception: %s", str(e))
traceback.print_exc()
return False
async def get_top(self, n: int = 10) -> Optional[dict]:
"""
Get top (n=10) Karma
Args:
n (int): Number of top results to return, default 10
Returns:
Optional[dict]
"""
try:
async with ClientSession() as session:
async with await session.post(
self.karma_top_10_url,
json={
"n": n,
},
headers={
"content-type": "application/json; charset=utf-8",
"X-Authd-With": f"Bearer {constants.KARMA_API_KEY}",
},
timeout=ClientTimeout(connect=3, sock_read=5),
) as request:
resp: dict = await request.json()
return resp
except Exception as e:
logging.debug("Exception: %s", str(e))
traceback.print_exc()
return None
async def get_top_embed(self, n: int = 10) -> Optional[discord.Embed]:
"""
Get Top Karma Embed
Args:
n (int): Number of top results to return, default 10
Returns:
Optional[discord.Embed]
"""
top: Optional[dict] = await self.get_top(n)
if not top:
return None
top_formatted: str = ""
for x, item in enumerate(top):
top_formatted += (
f"{x + 1}. **{discord.utils.escape_markdown(item[0])}**: *{item[1]}*\n"
)
top_formatted = top_formatted.strip()
embed: discord.Embed = discord.Embed(
title=f"Top {n} Karma", description=top_formatted, colour=0xFF00FF
)
return embed
async def update_karma(
self, display: str, _id: int, keyword: str, flag: int
) -> bool:
"""
Update Karma for Keyword
Args:
display (str): Display name of the user who requested the update
_id (int): Discord UID of the user who requested the update
keyword (str): Keyword to update
flag (int)
Returns:
bool
"""
if flag not in [0, 1]:
return False
reqObj: dict = {
"granter": f"Discord: {display} ({_id})",
"keyword": keyword,
"flag": flag,
}
try:
async with ClientSession() as session:
async with await session.post(
self.karma_update_url,
json=reqObj,
headers={
"content-type": "application/json; charset=utf-8",
"X-Authd-With": f"Bearer {self.api_key}",
},
timeout=ClientTimeout(connect=3, sock_read=5),
) as request:
result = await request.json()
return result.get("success", False)
except Exception as e:
logging.debug("Exception: %s", str(e))
traceback.print_exc()
return False
async def check_cooldown(self, user_id: int) -> bool:
"""
Check if member has met cooldown period prior to adjusting karma
Args:
user_id (int): The Discord UID to check
Returns:
bool
"""
if user_id not in self.timers:
return True
now = int(time.time())
if (now - self.timers[user_id]) < self.karma_cooldown:
return False
return True
class Karma(commands.Cog):
"""Karma Cog for Havoc"""
def __init__(self, bot: Havoc):
importlib.reload(constants)
self.bot: Havoc = bot
self.util = Util(self.bot)
# self.karma_regex = regex.compile(r'(\w+)(\+\+|\-\-)')
self.karma_regex: Pattern = regex.compile(
r"(\b\w+(?:\s+\w+)*)(\+\+($|\s)|\-\-($|\s))"
)
self.mention_regex: Pattern = regex.compile(r"(<@([0-9]{17,20})>)(\+\+|\-\-)")
self.mention_regex_no_flag: Pattern = regex.compile(r"(<@([0-9]{17,20})>+)")
self.karma_chanid: int = 1307065684785893406
self.karma_msgid: int = 1325442184572567686
# asyncio.get_event_loop().create_task(self.bot.get_channel(self.karma_chanid).send("."))
try:
self.update_karma_chan.start()
except: # noqa
"""Safe to ignore"""
pass
@tasks.loop(seconds=30, reconnect=True)
async def update_karma_chan(self) -> None:
"""Update the Karma Chan Leaderboard"""
try:
top_embed = await self.util.get_top_embed(n=25)
channel = self.bot.get_channel(self.karma_chanid)
if not isinstance(channel, discord.TextChannel):
return
message_to_edit = await channel.fetch_message(self.karma_msgid)
await message_to_edit.edit(
embed=top_embed,
content="## This message will automatically update periodically.",
)
except Exception as e:
logging.debug("Exception: %s", str(e))
traceback.print_exc()
@commands.Cog.listener()
async def on_message(self, message: discord.Message) -> None:
"""
Message hook, to monitor for ++/--
Also monitors for messages to #karma to autodelete, only Havoc may post in #karma!
"""
if not self.bot.user: # No valid client instance
return
if not isinstance(message.channel, discord.TextChannel):
return
if (
message.channel.id == self.karma_chanid
and not message.author.id == self.bot.user.id
):
"""Message to #karma not by Havoc, delete it"""
await message.delete(reason="Messages to #karma are not allowed")
removal_embed: discord.Embed = discord.Embed(
title="Message Deleted",
description=f"Your message to **#{message.channel.name}** has been automatically deleted.\n**Reason**: Messages to this channel by users is not allowed.",
)
await message.author.send(embed=removal_embed)
if message.author.id == self.bot.user.id: # Bots own message
return
if not message.guild:
return
if message.guild.id not in [
1145182936002482196,
1228740575235149855,
]: # Not a valid guild for cmd
return
message_content: str = message.content.strip()
mentions: list = regex.findall(self.mention_regex, message_content)
for mention in mentions:
try:
logging.debug("Mention: %s", mention)
mentioned_uid: int = int(mention[1])
friendly_flag: int = int(mention[2])
guild: Optional[discord.Guild] = self.bot.get_guild(message.guild.id)
if not guild:
return
guild_member: Optional[discord.Member] = guild.get_member(mentioned_uid)
if not guild_member:
return
display: str = guild_member.display_name
message_content = message_content.replace(mention[0], display)
logging.debug("New message: %s", message_content)
except Exception as e:
logging.debug("Exception: %s", str(e))
traceback.print_exc()
message_content = discord.utils.escape_markdown(message_content)
karma_regex: list[str] = regex.findall(
self.karma_regex, message_content.strip()
)
if not karma_regex: # Not a request to adjust karma
return
flooding: bool = not await self.util.check_cooldown(message.author.id)
exempt_uids: list[int] = [1172340700663255091, 992437729927376996]
if flooding and message.author.id not in exempt_uids:
return await message.add_reaction(emoji="")
processed_keywords_lc: list[str] = []
logging.debug("Matched: %s", karma_regex)
for matched_keyword in karma_regex:
if not isinstance(matched_keyword, tuple):
continue
if len(matched_keyword) == 4:
(keyword, friendly_flag, _, __) = matched_keyword
else:
(keyword, friendly_flag) = matched_keyword
now: int = int(time.time())
flag: int = None
match friendly_flag:
case "++":
flag = 0
case "--":
flag = 1
case _:
logging.info("Unknown flag %s", flag)
continue
if keyword.lower() in processed_keywords_lc:
continue
processed_keywords_lc.append(keyword.lower())
self.util.timers[message.author.id] = now
updated: bool = await self.util.update_karma(
message.author.display_name, message.author.id, keyword, flag
)
if updated:
return await message.add_reaction(emoji="👍")
@bridge.bridge_command()
async def karma(self, ctx, *, keyword: str | None = None) -> None:
"""With no arguments, top 10 karma is provided; a keyword can also be provided to lookup."""
try:
if not keyword:
top_10_embed: Optional[discord.Embed] = await self.util.get_top_embed()
if not top_10_embed:
return
return await ctx.respond(embed=top_10_embed)
keyword = discord.utils.escape_markdown(keyword)
mentions: list[str] = regex.findall(self.mention_regex_no_flag, keyword)
for mention in mentions:
try:
mentioned_uid = int(mention[1])
guild: Optional[discord.Guild] = self.bot.get_guild(ctx.guild.id)
if not guild:
return
guild_member: Optional[discord.Member] = guild.get_member(
mentioned_uid
)
if not guild_member:
return
display = guild_member.display_name
keyword = keyword.replace(mention[0], display)
except Exception as e:
logging.debug("Exception: %s", str(e))
traceback.print_exc()
continue
score: int = await self.util.get_karma(keyword)
description: str = f"**{keyword}** has a karma of *{score}*"
embed: discord.Embed = discord.Embed(
title=f"Karma for {keyword}", description=description
)
return await ctx.respond(embed=embed)
except Exception as e:
await ctx.respond(f"Error: {str(e)}")
traceback.print_exc()
def cog_unload(self) -> None:
try:
self.update_karma_chan.cancel()
except: # noqa
"""Safe to ignore"""
pass
def setup(bot) -> None:
"""Run on Cog Load"""
bot.add_cog(Karma(bot))

View File

@ -16,6 +16,7 @@ import discord
from disc_havoc import Havoc
from aiohttp import ClientSession
from discord.ext import bridge, commands, tasks
from util.discord_helpers import log_to_playground
from util.jesusmemes import JesusMemeGenerator
import scrapers.reddit_scrape as memeg
import scrapers.explosm_scrape as explosmg
@ -209,7 +210,7 @@ class Meme(commands.Cog):
else:
self.meme_leaderboard[uid] += 1
async with sqlite3.connect(self.stats_db_path, timeout=2) as db_conn:
async with sqlite3.connect(self.stats_db_path, timeout=5) as db_conn:
"""Attempts both insert/update"""
query_1: str = "UPDATE memes SET count = count + 1 WHERE discord_uid = ?"
query_1_params: tuple = (uid,)
@ -238,7 +239,7 @@ class Meme(commands.Cog):
INIT MEME LEADERBOARD
"""
self.meme_leaderboard: dict[int, int] = {}
async with sqlite3.connect(self.stats_db_path, timeout=2) as db_conn:
async with sqlite3.connect(self.stats_db_path, timeout=5) as db_conn:
db_conn.row_factory = sqlite3.Row
db_query: str = "SELECT discord_uid, count FROM memes WHERE count > 0"
async with db_conn.execute(db_query) as db_cursor:
@ -254,6 +255,7 @@ class Meme(commands.Cog):
"""
INSERT MEME -> SQLITE DB
"""
try:
try:
image.seek(0)
_image = Image.open(image)
@ -262,7 +264,7 @@ class Meme(commands.Cog):
phash: str = str(imagehash.phash(_image))
query: str = "INSERT INTO memes(discord_uid, timestamp, image, message_ids, phash) VALUES(?, ?, ?, ?, ?)"
image.seek(0)
async with sqlite3.connect(self.memedb_path, timeout=2) as db_conn:
async with sqlite3.connect(self.memedb_path, timeout=5) as db_conn:
insert = await db_conn.execute_insert(
query, (discord_uid, timestamp, image.read(), message_id, phash)
)
@ -270,11 +272,18 @@ class Meme(commands.Cog):
await db_conn.commit()
return True
return None
except Exception as e:
await log_to_playground(
self.bot,
f"Exception occurred while attempting to insert meme (message id: {message_id}):\n{str(e)}",
)
return None
async def dupe_check(self, image) -> bool | int:
"""
CHECK DB FOR DUPLICATE MEMES!
"""
try:
phash: str = str(imagehash.phash(image))
query: str = "SELECT message_ids FROM memes WHERE phash = ? LIMIT 1"
async with sqlite3.connect(self.memedb_path, timeout=2) as db_conn:
@ -284,6 +293,12 @@ class Meme(commands.Cog):
if result:
return result["message_ids"]
return False
except Exception as e:
await log_to_playground(
self.bot,
f"Exception occurred while checking image for duplicates in DB:\n{str(e)}",
)
return False
@commands.Cog.listener()
async def on_ready(self) -> None:
@ -753,7 +768,7 @@ class Meme(commands.Cog):
"""
try:
out_top: list[tuple[int, int]] = []
async with sqlite3.connect(self.stats_db_path, timeout=2) as db_conn:
async with sqlite3.connect(self.stats_db_path, timeout=5) as db_conn:
db_conn.row_factory = sqlite3.Row
query: str = "SELECT discord_uid, count FROM memes WHERE count > 0 ORDER BY count DESC"
async with db_conn.execute(query) as db_cursor:

View File

@ -46,9 +46,6 @@ class Sing(commands.Cog):
"""
try:
with ctx.channel.typing():
interaction: bool = isinstance(
ctx, discord.ext.bridge.BridgeApplicationContext
)
activity: Optional[discord.Activity] = None
if not song:
if not ctx.author.activities:
@ -63,9 +60,9 @@ class Sing(commands.Cog):
"**Error**: No song specified, no activity found to read."
)
if interaction:
await ctx.respond(
"*Searching...*"
"*Searching...*",
ephemeral=True
) # Must respond to interactions within 3 seconds, per Discord
parsed = self.utility.parse_song_input(song, activity)
@ -112,15 +109,11 @@ class Sing(commands.Cog):
f"**{search_result_song}** by **{search_result_artist}**\n-# {short_lyrics}"
)
c: int = 0
out_messages: list = []
footer: str = "" # Placeholder
for section in search_result_wrapped:
c += 1
for c, section in enumerate(search_result_wrapped):
if c == len(search_result_wrapped):
footer = f"`Found on: {search_result_src}`"
# if ctx.guild.id == 1145182936002482196:
# section = section.upper()
section = regex.sub(r"\p{Vert_Space}", " / ", section.strip())
msg: str = f"**{search_result_song}** by **{search_result_artist}**\n-# {section}\n{footer}"
if c > 1:

View File

@ -24,7 +24,6 @@ cogs_list: list[str] = [
"owner",
"sing",
"meme",
"karma",
"lovehate",
"radio",
]

View File

@ -58,3 +58,15 @@ async def send_message(
if not isinstance(_channel, discord.TextChannel):
return None
await _channel.send(message)
async def log_to_playground(bot: discord.Bot, message: str) -> None:
"""
Send {message} to playground/log chan
Args:
bot (discord.Bot): Bot instance
message (str): The message to send
Returns:
None
"""
return await send_message(bot=bot, channel="havoc-playground", message=message)

View File

@ -208,7 +208,7 @@ class Util:
) as db_cursor:
if db_cursor.rowcount < 0:
logging.critical(
"[karma::increment_counter] Fail! %s", db_cursor.rowcount
"[increment_counter] Fail! %s", db_cursor.rowcount
)
return False
await db_conn.commit()