diff --git a/cogs/quote.py b/cogs/quote.py new file mode 100644 index 0000000..7798076 --- /dev/null +++ b/cogs/quote.py @@ -0,0 +1,329 @@ +#!/usr/bin/env python3.12 +# pylint: disable=bare-except, broad-exception-caught + +""" +Quote cog for Havoc +""" + +import traceback +import time +import os +import datetime +from typing import Optional +import asyncio +import discord +import aiosqlite as sqlite3 +from discord.ext import bridge, commands +from disc_havoc import Havoc + +class DB: + """DB Utility for Quote Cog""" + def __init__(self, bot: Havoc): + self.bot: Havoc = bot + self.db_path = os.path.join("/", "usr", "local", "share", + "sqlite_dbs", "quotes.db") + self.hp_chanid = 1157529874936909934 + + + async def get_quote_count(self): + """Get Quote Count""" + async with sqlite3.connect(self.db_path, timeout=2) as db_conn: + async with await db_conn.execute("SELECT COUNT (*) FROM quotes") as db_cursor: + result = await db_cursor.fetchone() + return result[-1] + + async def remove_quote(self, quote_id: int): + """Remove Quote from DB""" + try: + async with sqlite3.connect(self.db_path, timeout=2) as db_conn: + async with await db_conn.execute("DELETE FROM quotes WHERE id = ?", (quote_id,)) as _: + await db_conn.commit() + return True + except Exception as e: # noqa + _channel = self.bot.get_channel(self.hp_chanid) + if isinstance(_channel, discord.TextChannel): + await _channel.send(traceback.format_exc()) + return False + + async def add_quote(self, message_id: int, channel_id: int, + quoted_member_id: int, + message_time: int, + quoter_friendly: str, + quoted_friendly: str, + channel_friendly: str, + message_content: str, + ): + """Add Quote to DB""" + params = ( + quoter_friendly, + int(time.time()), + quoted_friendly, + quoted_member_id, + channel_friendly, + channel_id, + message_id, + message_time, + quoter_friendly, + message_content, + ) + + try: + async with sqlite3.connect(self.db_path, timeout=2) as db_conn: + # pylint: disable=line-too-long + db_conn.row_factory = sqlite3.Row + async with await db_conn.execute("INSERT INTO quotes (added_by, added_at, quoted_user_display, quoted_user_memberid, quoted_channel_display, quoted_channel_id, quoted_message_id, quoted_message_time, added_by_friendly, quoted_message) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", + params) as _: # pylint: enable=line-too-long + await db_conn.commit() + return True + except Exception as e: # noqa + return traceback.format_exc() + + async def fetch_quote(self, random: bool = False, quoteid: Optional[int] = None, + added_by: Optional[str] = None, quoted_user: Optional[str] = None, + content: Optional[str] = None): + """Fetch Quote from DB""" + try: + query_head = "SELECT id, added_by_friendly, added_at, quoted_user_display, quoted_channel_display, quoted_message_time, quoted_message FROM quotes" + query = "" + params: Optional[tuple] = None + + if random: + query = f"{query_head} ORDER BY RANDOM() LIMIT 1" + elif quoteid: + query = f"{query_head} WHERE id = ? LIMIT 1" + params = (quoteid,) + elif added_by: + query = f"{query_head} WHERE added_by_friendly LIKE ? ORDER BY RANDOM() LIMIT 5" + params = (f"%{added_by}%",) + elif quoted_user: + query = f"{query_head} WHERE quoted_user_display LIKE ? ORDER BY RANDOM() LIMIT 5" + params = (f"%{quoted_user}%",) + elif content: + query = f"{query_head} WHERE quoted_message LIKE ? ORDER BY RANDOM() LIMIT 5" + params = (f"%{content}%",) + + async with sqlite3.connect(self.db_path, timeout=2) as db_conn: + db_conn.row_factory = sqlite3.Row + async with await db_conn.execute(query, params) as db_cursor: + results = await db_cursor.fetchall() + if not results: + return { + 'err': 'No results for query', + } + if random or quoteid: + chosen = results[-1] + return { + str(k): v for k,v in chosen.items() + } + else: + return [ + { str(k): v for k,v in _.items() } + for _ in results + ] + except Exception as e: # noqa + return traceback.format_exc() + + +class Quote(commands.Cog): + """Quote Cog for Havoc""" + def __init__(self, bot: Havoc): + self.bot: Havoc = bot + self.db = DB(self.bot) + + def is_homeserver(): # type: ignore + """Check if channel/interaction is within homeserver""" + def predicate(ctx): + try: + return ctx.guild.id == 1145182936002482196 + except Exception as e: # noqa + traceback.print_exc() + return False + return commands.check(predicate) + + @commands.message_command(name="Add Quote") + async def add_quote(self, ctx, message: discord.Message): + """Add A Quote""" + hp_chanid = 1157529874936909934 + try: + if message.author.bot: + return await ctx.respond("Quotes are for real users, not bots.", ephemeral=True) + quoter_friendly = ctx.author.display_name + quoted_message_id = message.id + quoted_channel_friendly = f'#{ctx.channel.name}' + quoted_channel_id = ctx.channel.id + message_content = message.content + message_author_friendly = message.author.display_name + message_author_id = message.author.id + message_time = int(message.created_at.timestamp()) + message_escaped = discord.utils.escape_mentions(discord.utils.escape_markdown(message_content)).strip() + + if len(message_escaped) < 3: + return await ctx.respond("**Error**: Message (text content) is not long enough to quote.", ephemeral=True) + + if len(message_escaped) > 512: + return await ctx.respond("**Error**: Message (text content) is too long to quote.", ephemeral=True) + + result = await self.db.add_quote(message_id=quoted_message_id, + channel_id=quoted_channel_id, + quoted_member_id=message_author_id, + message_time=message_time, + quoter_friendly=quoter_friendly, + quoted_friendly=message_author_friendly, + channel_friendly=quoted_channel_friendly, + message_content=message_content) + if not result: + return await ctx.respond("Failed!", ephemeral=True) + else: + return await ctx.respond("OK!", ephemeral=True) + except Exception as e: # noqa + _channel = self.bot.get_channel(hp_chanid) + if not isinstance(_channel, discord.TextChannel): + return + await _channel.send(traceback.format_exc()) + + + @bridge.bridge_command(aliases=['rand']) + @is_homeserver() # pylint: disable=too-many-function-args + async def randquote(self, ctx): + """Get a random quote""" + try: + random_quote = await self.db.fetch_quote(random=True) + if random_quote.get('err'): + return await ctx.respond("Failed to get a quote") + + quote_id = random_quote.get('id') + quoted_friendly = random_quote.get('quoted_user_display', 'Unknown') + adder_friendly = random_quote.get('added_by_friendly', 'Unknown') + message_time = datetime.datetime.fromtimestamp(random_quote.get('quoted_message_time')) + message_channel = random_quote.get('quoted_channel_display') + quote_added_at = datetime.datetime.fromtimestamp(random_quote.get('added_at')) + quote_content = random_quote.get('quoted_message') + + embed = discord.Embed( + colour=discord.Colour.orange(), + title=f"Quote #{quote_id}", + ) + embed.description = f"**{quoted_friendly}:** {quote_content}" + embed.add_field(name="Original Message Time", value=message_time) + embed.add_field(name="Channel", value=message_channel) + embed.add_field(name="Quote ID", value=quote_id) + embed.footer = discord.EmbedFooter(text=f"Added by {adder_friendly} {quote_added_at}") + + return await ctx.respond(embed=embed) + except Exception as e: # noqa + error = await ctx.respond(traceback.format_exc()) + await asyncio.sleep(10) + await error.delete() + + @bridge.bridge_command(aliases=['qg']) + @is_homeserver() # pylint: disable=too-many-function-args + async def quoteget(self, ctx, quoteid): + """Get a specific quote by ID""" + try: + if not str(quoteid).strip().isnumeric(): + return await ctx.respond("**Error**: Quote ID must be numeric.") + fetched_quote = await self.db.fetch_quote(quoteid=quoteid) + if fetched_quote.get('err'): + return await ctx.respond("**Error**: Quote not found") + + quote_id = fetched_quote.get('id') + quoted_friendly = fetched_quote.get('quoted_user_display', 'Unknown') + adder_friendly = fetched_quote.get('added_by_friendly', 'Unknown') + message_time = datetime.datetime.fromtimestamp(fetched_quote.get('quoted_message_time')) + message_channel = fetched_quote.get('quoted_channel_display') + quote_added_at = datetime.datetime.fromtimestamp(fetched_quote.get('added_at')) + quote_content = fetched_quote.get('quoted_message') + + embed = discord.Embed( + colour=discord.Colour.orange(), + title=f"Quote #{quote_id}", + ) + embed.description = f"**{quoted_friendly}:** {quote_content}" + embed.add_field(name="Original Message Time", value=message_time) + embed.add_field(name="Channel", value=message_channel) + embed.add_field(name="Quote ID", value=quote_id) + embed.footer = discord.EmbedFooter(text=f"Added by {adder_friendly} {quote_added_at}") + + return await ctx.respond(embed=embed) + except Exception as e: # noqa + error = await ctx.respond(traceback.format_exc()) + await asyncio.sleep(10) + await error.delete() + + @bridge.bridge_command(aliases=['qs']) + @is_homeserver() # pylint: disable=too-many-function-args + async def quotesearch(self, ctx, *, content: str): + """Search for a quote (by content)""" + try: + found_quotes = await self.db.fetch_quote(content=content) + if isinstance(found_quotes, dict) and found_quotes.get('err'): + return await ctx.respond(f"Quote search failed: {found_quotes.get('err')}") + + embeds = [] + + for quote in found_quotes: + quote_id = quote.get('id') + quoted_friendly = quote.get('quoted_user_display', 'Unknown') + adder_friendly = quote.get('added_by_friendly', 'Unknown') + message_time = datetime.datetime.fromtimestamp(quote.get('quoted_message_time')) + message_channel = quote.get('quoted_channel_display') + quote_added_at = datetime.datetime.fromtimestamp(quote.get('added_at')) + quote_content = quote.get('quoted_message') + + # await ctx.respond(f"**{quoted_friendly}**: {quote}")ed_friendly = quote.get('quoted_user_display', 'Unknown') + adder_friendly = quote.get('added_by_friendly', 'Unknown') + message_time = datetime.datetime.fromtimestamp(quote.get('quoted_message_time')) + message_channel = quote.get('quoted_channel_display') + quote_added_at = datetime.datetime.fromtimestamp(quote.get('added_at')) + quote = quote.get('quoted_message') + + # await ctx.respond(f"**{quoted_friendly}**: {quote}") + embed = discord.Embed( + colour=discord.Colour.orange(), + title=f"Quote #{quote_id}", + ) + embed.description = f"**{quoted_friendly}:** {quote_content}" + embed.add_field(name="Original Message Time", value=str(message_time)) + embed.add_field(name="Channel", value=message_channel) + embed.add_field(name="Quote ID", value=quote_id) + embed.footer = discord.EmbedFooter(text=f"Added by {adder_friendly} {quote_added_at}") + embeds.append(embed) + + return await ctx.respond(embeds=embeds) + except Exception as e: + await ctx.respond(f"Error: {type(e).__name__} - {str(e)}") + + @bridge.bridge_command(aliases=['nq']) + @is_homeserver() # pylint: disable=too-many-function-args + async def nquotes(self, ctx): + """Get # of quotes stored""" + try: + quote_count = await self.db.get_quote_count() + if not quote_count: + return await ctx.respond("**Error**: No quotes found!") + return await ctx.respond(f"I currently have **{quote_count}** quotes stored.") + + except Exception as e: + await ctx.respond(f"Error: {type(e).__name__} - {str(e)}") + + @bridge.bridge_command(aliases=['qr']) + @commands.is_owner() + @is_homeserver() # pylint: disable=too-many-function-args + async def quoteremove(self, ctx, quoteid): + """Remove a quote (by id) + Owner only""" + try: + if not str(quoteid).strip().isnumeric(): + return await ctx.respond("**Error**: Quote ID must be numeric.") + quoteid = int(quoteid) + remove_quote = await self.db.remove_quote(quoteid) + if not remove_quote: + return await ctx.respond("**Error**: Failed!", ephemeral=True) + return await ctx.respond("Removed!", ephemeral=True) + + except Exception as e: + await ctx.respond(f"Error: {type(e).__name__} - {str(e)}") + +def setup(bot): + """Run on Cog Load""" + bot.add_cog(Quote(bot))