#!/usr/bin/env python3.12 # pylint: disable=bare-except, broad-exception-caught """ Quote cog for Havoc """ import traceback import time import os import datetime from typing import Optional import asyncio import discord import aiosqlite as sqlite3 from discord.ext import bridge, commands from disc_havoc import Havoc class DB: """DB Utility for Quote Cog""" def __init__(self, bot: Havoc): self.bot: Havoc = bot self.db_path = os.path.join("/", "usr", "local", "share", "sqlite_dbs", "quotes.db") self.hp_chanid = 1157529874936909934 async def get_quote_count(self): """Get Quote Count""" async with sqlite3.connect(self.db_path, timeout=2) as db_conn: async with await db_conn.execute("SELECT COUNT (*) FROM quotes") as db_cursor: result = await db_cursor.fetchone() return result[-1] async def remove_quote(self, quote_id: int): """Remove Quote from DB""" try: async with sqlite3.connect(self.db_path, timeout=2) as db_conn: async with await db_conn.execute("DELETE FROM quotes WHERE id = ?", (quote_id,)) as _: await db_conn.commit() return True except Exception as e: # noqa _channel = self.bot.get_channel(self.hp_chanid) if isinstance(_channel, discord.TextChannel): await _channel.send(traceback.format_exc()) return False async def add_quote(self, message_id: int, channel_id: int, quoted_member_id: int, message_time: int, quoter_friendly: str, quoted_friendly: str, channel_friendly: str, message_content: str, ): """Add Quote to DB""" params = ( quoter_friendly, int(time.time()), quoted_friendly, quoted_member_id, channel_friendly, channel_id, message_id, message_time, quoter_friendly, message_content, ) try: async with sqlite3.connect(self.db_path, timeout=2) as db_conn: # pylint: disable=line-too-long db_conn.row_factory = sqlite3.Row async with await db_conn.execute("INSERT INTO quotes (added_by, added_at, quoted_user_display, quoted_user_memberid, quoted_channel_display, quoted_channel_id, quoted_message_id, quoted_message_time, added_by_friendly, quoted_message) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", params) as _: # pylint: enable=line-too-long await db_conn.commit() return True except Exception as e: # noqa return traceback.format_exc() async def fetch_quote(self, random: bool = False, quoteid: Optional[int] = None, added_by: Optional[str] = None, quoted_user: Optional[str] = None, content: Optional[str] = None): """Fetch Quote from DB""" try: query_head = "SELECT id, added_by_friendly, added_at, quoted_user_display, quoted_channel_display, quoted_message_time, quoted_message FROM quotes" query = "" params: Optional[tuple] = None if random: query = f"{query_head} ORDER BY RANDOM() LIMIT 1" elif quoteid: query = f"{query_head} WHERE id = ? LIMIT 1" params = (quoteid,) elif added_by: query = f"{query_head} WHERE added_by_friendly LIKE ? ORDER BY RANDOM() LIMIT 5" params = (f"%{added_by}%",) elif quoted_user: query = f"{query_head} WHERE quoted_user_display LIKE ? ORDER BY RANDOM() LIMIT 5" params = (f"%{quoted_user}%",) elif content: query = f"{query_head} WHERE quoted_message LIKE ? ORDER BY RANDOM() LIMIT 5" params = (f"%{content}%",) async with sqlite3.connect(self.db_path, timeout=2) as db_conn: db_conn.row_factory = sqlite3.Row async with await db_conn.execute(query, params) as db_cursor: results = await db_cursor.fetchall() if not results: return { 'err': 'No results for query', } if random or quoteid: chosen = results[-1] return { str(k): v for k,v in chosen.items() } else: return [ { str(k): v for k,v in _.items() } for _ in results ] except Exception as e: # noqa return traceback.format_exc() class Quote(commands.Cog): """Quote Cog for Havoc""" def __init__(self, bot: Havoc): self.bot: Havoc = bot self.db = DB(self.bot) def is_homeserver(): # type: ignore """Check if channel/interaction is within homeserver""" def predicate(ctx): try: return ctx.guild.id == 1145182936002482196 except Exception as e: # noqa traceback.print_exc() return False return commands.check(predicate) @commands.message_command(name="Add Quote") async def add_quote(self, ctx, message: discord.Message): """Add A Quote""" hp_chanid = 1157529874936909934 try: if message.author.bot: return await ctx.respond("Quotes are for real users, not bots.", ephemeral=True) quoter_friendly = ctx.author.display_name quoted_message_id = message.id quoted_channel_friendly = f'#{ctx.channel.name}' quoted_channel_id = ctx.channel.id message_content = message.content message_author_friendly = message.author.display_name message_author_id = message.author.id message_time = int(message.created_at.timestamp()) message_escaped = discord.utils.escape_mentions(discord.utils.escape_markdown(message_content)).strip() if len(message_escaped) < 3: return await ctx.respond("**Error**: Message (text content) is not long enough to quote.", ephemeral=True) if len(message_escaped) > 512: return await ctx.respond("**Error**: Message (text content) is too long to quote.", ephemeral=True) result = await self.db.add_quote(message_id=quoted_message_id, channel_id=quoted_channel_id, quoted_member_id=message_author_id, message_time=message_time, quoter_friendly=quoter_friendly, quoted_friendly=message_author_friendly, channel_friendly=quoted_channel_friendly, message_content=message_content) if not result: return await ctx.respond("Failed!", ephemeral=True) else: return await ctx.respond("OK!", ephemeral=True) except Exception as e: # noqa _channel = self.bot.get_channel(hp_chanid) if not isinstance(_channel, discord.TextChannel): return await _channel.send(traceback.format_exc()) @bridge.bridge_command(aliases=['rand']) @is_homeserver() # pylint: disable=too-many-function-args async def randquote(self, ctx): """Get a random quote""" try: random_quote = await self.db.fetch_quote(random=True) if random_quote.get('err'): return await ctx.respond("Failed to get a quote") quote_id = random_quote.get('id') quoted_friendly = random_quote.get('quoted_user_display', 'Unknown') adder_friendly = random_quote.get('added_by_friendly', 'Unknown') message_time = datetime.datetime.fromtimestamp(random_quote.get('quoted_message_time')) message_channel = random_quote.get('quoted_channel_display') quote_added_at = datetime.datetime.fromtimestamp(random_quote.get('added_at')) quote_content = random_quote.get('quoted_message') embed = discord.Embed( colour=discord.Colour.orange(), title=f"Quote #{quote_id}", ) embed.description = f"**{quoted_friendly}:** {quote_content}" embed.add_field(name="Original Message Time", value=message_time) embed.add_field(name="Channel", value=message_channel) embed.add_field(name="Quote ID", value=quote_id) embed.footer = discord.EmbedFooter(text=f"Added by {adder_friendly} {quote_added_at}") return await ctx.respond(embed=embed) except Exception as e: # noqa error = await ctx.respond(traceback.format_exc()) await asyncio.sleep(10) await error.delete() @bridge.bridge_command(aliases=['qg']) @is_homeserver() # pylint: disable=too-many-function-args async def quoteget(self, ctx, quoteid): """Get a specific quote by ID""" try: if not str(quoteid).strip().isnumeric(): return await ctx.respond("**Error**: Quote ID must be numeric.") fetched_quote = await self.db.fetch_quote(quoteid=quoteid) if fetched_quote.get('err'): return await ctx.respond("**Error**: Quote not found") quote_id = fetched_quote.get('id') quoted_friendly = fetched_quote.get('quoted_user_display', 'Unknown') adder_friendly = fetched_quote.get('added_by_friendly', 'Unknown') message_time = datetime.datetime.fromtimestamp(fetched_quote.get('quoted_message_time')) message_channel = fetched_quote.get('quoted_channel_display') quote_added_at = datetime.datetime.fromtimestamp(fetched_quote.get('added_at')) quote_content = fetched_quote.get('quoted_message') embed = discord.Embed( colour=discord.Colour.orange(), title=f"Quote #{quote_id}", ) embed.description = f"**{quoted_friendly}:** {quote_content}" embed.add_field(name="Original Message Time", value=message_time) embed.add_field(name="Channel", value=message_channel) embed.add_field(name="Quote ID", value=quote_id) embed.footer = discord.EmbedFooter(text=f"Added by {adder_friendly} {quote_added_at}") return await ctx.respond(embed=embed) except Exception as e: # noqa error = await ctx.respond(traceback.format_exc()) await asyncio.sleep(10) await error.delete() @bridge.bridge_command(aliases=['qs']) @is_homeserver() # pylint: disable=too-many-function-args async def quotesearch(self, ctx, *, content: str): """Search for a quote (by content)""" try: found_quotes = await self.db.fetch_quote(content=content) if isinstance(found_quotes, dict) and found_quotes.get('err'): return await ctx.respond(f"Quote search failed: {found_quotes.get('err')}") embeds = [] for quote in found_quotes: quote_id = quote.get('id') quoted_friendly = quote.get('quoted_user_display', 'Unknown') adder_friendly = quote.get('added_by_friendly', 'Unknown') message_time = datetime.datetime.fromtimestamp(quote.get('quoted_message_time')) message_channel = quote.get('quoted_channel_display') quote_added_at = datetime.datetime.fromtimestamp(quote.get('added_at')) quote_content = quote.get('quoted_message') # await ctx.respond(f"**{quoted_friendly}**: {quote}")ed_friendly = quote.get('quoted_user_display', 'Unknown') adder_friendly = quote.get('added_by_friendly', 'Unknown') message_time = datetime.datetime.fromtimestamp(quote.get('quoted_message_time')) message_channel = quote.get('quoted_channel_display') quote_added_at = datetime.datetime.fromtimestamp(quote.get('added_at')) quote = quote.get('quoted_message') # await ctx.respond(f"**{quoted_friendly}**: {quote}") embed = discord.Embed( colour=discord.Colour.orange(), title=f"Quote #{quote_id}", ) embed.description = f"**{quoted_friendly}:** {quote_content}" embed.add_field(name="Original Message Time", value=str(message_time)) embed.add_field(name="Channel", value=message_channel) embed.add_field(name="Quote ID", value=quote_id) embed.footer = discord.EmbedFooter(text=f"Added by {adder_friendly} {quote_added_at}") embeds.append(embed) return await ctx.respond(embeds=embeds) except Exception as e: await ctx.respond(f"Error: {type(e).__name__} - {str(e)}") @bridge.bridge_command(aliases=['nq']) @is_homeserver() # pylint: disable=too-many-function-args async def nquotes(self, ctx): """Get # of quotes stored""" try: quote_count = await self.db.get_quote_count() if not quote_count: return await ctx.respond("**Error**: No quotes found!") return await ctx.respond(f"I currently have **{quote_count}** quotes stored.") except Exception as e: await ctx.respond(f"Error: {type(e).__name__} - {str(e)}") @bridge.bridge_command(aliases=['qr']) @commands.is_owner() @is_homeserver() # pylint: disable=too-many-function-args async def quoteremove(self, ctx, quoteid): """Remove a quote (by id) Owner only""" try: if not str(quoteid).strip().isnumeric(): return await ctx.respond("**Error**: Quote ID must be numeric.") quoteid = int(quoteid) remove_quote = await self.db.remove_quote(quoteid) if not remove_quote: return await ctx.respond("**Error**: Failed!", ephemeral=True) return await ctx.respond("Removed!", ephemeral=True) except Exception as e: await ctx.respond(f"Error: {type(e).__name__} - {str(e)}") def setup(bot): """Run on Cog Load""" bot.add_cog(Quote(bot))