import os import logging import traceback import random import datetime import pytz from typing import Optional, LiteralString, Union import regex import aiosqlite as sqlite3 from aiohttp import ClientSession, ClientTimeout from bohancompliment import ComplimentGenerator from discord import Embed class Util: """Misc Utility""" def __init__(self) -> None: self.URL_URBANDICTIONARY: str = "http://api.urbandictionary.com/v0/define" self.URL_INSULTAPI: str = "https://insult.mattbas.org/api/insult" self.COMPLIMENT_GENERATOR = ComplimentGenerator() self.dbs: dict[str, str|LiteralString] = { 'whisky': os.path.join("/usr/local/share", "sqlite_dbs", "whiskey.db"), 'drinks': os.path.join("/usr/local/share", "sqlite_dbs", "cocktails.db"), 'strains': os.path.join("/usr/local/share", "sqlite_dbs", "strains.db"), 'qajoke': os.path.join("/usr/local/share", "sqlite_dbs", "qajoke.db"), 'rjokes': os.path.join("/usr/local/share", "sqlite_dbs", "rjokes.db"), 'randmsg': os.path.join("/usr/local/share", "sqlite_dbs", "randmsg.db"), 'stats': os.path.join("/usr/local/share", "sqlite_dbs", "havoc_stats.db"), 'cookies': os.path.join("/usr/local/share", "sqlite_dbs", "cookies.db"), } self.COFFEES: list = ['a cup of french-pressed coffee', 'a cup of cold brew', 'a cup of flash brew', 'a cup of Turkish coffee', 'a cup of Moka', 'an espresso', 'a cup of Nescafe coffee', 'an iced coffee', 'a Frappé', 'a freddo cappuccino', 'a cup of Chock full o\'Nuts', 'a cup of Folgers', 'a cup of Lavazza', 'a cup of Maxwell House', 'a cup of Moccona', 'a cup of Mr. Brown Coffee', 'a cup of affogato al caffè', 'a cup of Caffè Medici', 'a cup of Café Touba', 'a double-double', 'an indian filter coffee', 'a cup of pocillo', 'a cup of caffè americano', 'a cup of caffè lungo', 'a latte', 'a manilo', 'a flat white', 'a cup of café cubano', 'a cup of caffè crema', 'a cup of cafe zorro', 'an espresso roberto', 'an espresso romano', 'an espresso sara', 'a guillermo', 'a ristretto', 'a cup of melya', 'a cup of caffè marocchino', 'a cup of café miel', 'a cup of café de olla', 'a Mazagran', 'a Palazzo', 'an ice shot', 'a macchiato', 'a cortado', 'a red eye', 'a cappuccino', 'a mocha', 'a café au lait', 'a bicerin', 'a caffè corretto', 'a ca phe trung', 'a café bombón', 'a Vienna coffee', 'a flat black', 'a lungo', 'a doppio', 'a ristretto bianco', 'a piccolo latte', 'a gibraltar', 'a breve', 'a café con leche', 'a su café', 'a café del tiempo', 'a java chip frappuccino', 'a pumpkin spice latte', 'a caramel macchiato', 'a white chocolate mocha', 'a hazelnut coffee', 'a toffee nut latte', 'a peppermint mocha', 'a cinnamon dolce latte', 'a coconut milk latte', 'an almond milk cappuccino', 'an oat milk latte', 'a caramel frappuccino', 'a chocolate frappuccino', 'a butter pecan coffee', 'a maple pecan latte', 'a sea salt caramel mocha', 'a nitro cold brew', 'a pumpkin cold brew', 'a honey almond flat white', 'a sweet cream cold brew', 'a matcha latte', 'a golden latte', 'a turmeric latte', 'a beetroot latte', 'a Kopi luwak'] self.LAST_5_COFFEES: list = [] def tdTuple(self, td:datetime.timedelta) -> tuple: """ Create TimeDelta Tuple Args: td (datetime.timedelta) Returns: tuple """ def _t(t, n): if t < n: return (t, 0) v = t//n return (t - (v * n), v) (s, h) = _t(td.seconds, 3600) (s, m) = _t(s, 60) (mics, mils) = _t(td.microseconds, 1000) return (td.days, h, m, s, mics, mils) async def get_counter(self, counter: Optional[str] = None) -> Optional[dict]: """ Get Counter Args: counter (Optional[str]) Returns: Optional[dict] """ stats_db: str|LiteralString = self.dbs.get('stats', '') if not stats_db: return None async with sqlite3.connect(stats_db, timeout=3) as db_conn: db_conn.row_factory = sqlite3.Row query: str = "SELECT ? FROM stats LIMIT 1" if not counter: query = "SELECT * FROM stats LIMIT 1" async with await db_conn.execute(query, (counter,) if counter else None) as db_cursor: result = await db_cursor.fetchone() return result async def get_stats_embed(self) -> Optional[Embed]: """ Get Stats Embed Returns: Optional[Embed] """ counters: Optional[dict] = await self.get_counter() if not counters: return None embed: Embed = Embed(title="Stats") counter_message: str = "" counters_sorted: dict = dict(sorted(counters.items(), key=lambda item: item[1], reverse=True)) for counter, value in counters_sorted.items(): counter = regex.sub(r'_', ' ', counter.strip()).title() counter_message += f"- {value} {counter}\n" embed.description = counter_message.strip() return embed async def increment_counter(self, counter: str) -> bool: """ Increment Counter Args: counter (str) Returns: bool """ stats_db: str|LiteralString = self.dbs.get('stats', '') if not stats_db: return False async with sqlite3.connect(stats_db, timeout=3) as db_conn: async with await db_conn.execute(f"UPDATE stats SET {counter} = {counter} + 1") as db_cursor: if db_cursor.rowcount < 0: logging.critical("[karma::increment_counter] Fail! %s", db_cursor.rowcount) return False await db_conn.commit() return True async def get_ud_def(self, term: str) -> tuple[str, str]: """ Get Definition from UD Args: term (str) Returns: tuple[str, str] """ try: async with ClientSession() as session: async with await session.get(self.URL_URBANDICTIONARY, params={ "term": term, }, headers = { 'content-type': 'application/json; charset=utf-8', }, timeout=ClientTimeout(connect=5, sock_read=5)) as request: logging.debug("UD returned: %s", await request.text()) data: dict = await request.json() if "list" in data: definitions: list[dict] = data["list"] if definitions: definition: dict = definitions[0] definition_word: str = definition.get("word", "N/A") definition_text: str = regex.sub(r'(\r|\n|\r\n)', ' ', definition["definition"].strip()) return (definition_word, definition_text) # Tuple: Returned word, returned definition else: return (term, "Not found!") else: return (term, "Error retrieving data from Urban Dictionary") except Exception as e: traceback.print_exc() return (term, f"ERR: {str(e)}") async def get_insult(self, recipient: str) -> str: """ Get Insult Args: recipient (str) Returns: str """ async with ClientSession() as session: async with await session.get(f"{self.URL_INSULTAPI}?who={recipient}") as request: request.raise_for_status() return await request.text() async def get_compliment(self, subject: str, language: Optional[str] = None) -> str: """ Get Compliment Args: subject (str) language (Optional[str]) Returns: str """ if not language: return self.COMPLIMENT_GENERATOR.compliment(subject) return self.COMPLIMENT_GENERATOR.compliment_in_language(subject, language) async def get_whisky(self) -> Optional[tuple]: """ Get Whisky Returns: Optional[tuple] """ whisky_db: str|LiteralString = self.dbs.get('whisky', '') if not whisky_db: return None async with sqlite3.connect(database=whisky_db, timeout=2) as db_conn: db_query: str = "SELECT name, category, description FROM whiskeys ORDER BY random() LIMIT 1" async with await db_conn.execute(db_query) as db_cursor: db_result: Optional[Union[sqlite3.Row, tuple]] = await db_cursor.fetchone() if not db_result: return None (name, category, description) = db_result name = regex.sub(r'(^\p{White_Space}|\r|\n)', '', regex.sub(r'\p{White_Space}{2,}', ' ', name.strip())) category = regex.sub(r'(^\p{White_Space}|\r|\n)', '', regex.sub(r'\p{White_Space}{2,}', ' ', category.strip())) description = regex.sub(r'(^\p{White_Space}|\r|\n)', '', regex.sub(r'\p{White_Space}{2,}', ' ', description.strip())) return (name, category, description) async def get_drink(self) -> Optional[tuple]: """ Get Drink Returns: Optional[tuple] """ drinks_db: str|LiteralString = self.dbs.get('drinks', '') if not drinks_db: return None async with sqlite3.connect(database=drinks_db, timeout=2) as db_conn: db_query: str = "SELECT name, ingredients FROM cocktails ORDER BY random() LIMIT 1" async with await db_conn.execute(db_query) as db_cursor: db_result: tuple = await db_cursor.fetchone() (name, ingredients) = db_result name = regex.sub(r'(^\p{White_Space}|\r|\n)', '', regex.sub(r'\p{White_Space}{2,}', ' ', name.strip())) ingredients = regex.sub(r'(^\p{White_Space}|\r|\n)', '', regex.sub(r'\p{White_Space}{2,}', ' ', ingredients.strip())) ingredients = regex.sub(r'\*', '\u2731', ingredients.strip()) return (name, ingredients) async def get_strain(self, strain: Optional[str] = None) -> Optional[tuple]: """ Get Strain Args: strain (Optional[str]) Returns: Optional[tuple] """ strains_db: str|LiteralString = self.dbs.get('strains', '') if not strains_db: return None async with sqlite3.connect(database=strains_db, timeout=2) as db_conn: db_params: Optional[tuple] = None if not strain: db_query: str = "SELECT name, description FROM strains_w_desc ORDER BY random() LIMIT 1" else: db_query = "SELECT name, description FROM strains_w_desc WHERE name LIKE ?" db_params = (f"%{strain.strip()}%",) async with await db_conn.execute(db_query, db_params) as db_cursor: db_result: Optional[tuple] = await db_cursor.fetchone() return db_result async def get_qajoke(self) -> Optional[tuple]: """ Get QA Joke Returns: Optional[tuple] """ qajoke_db: str|LiteralString = self.dbs.get('qajoke', '') if not qajoke_db: return None async with sqlite3.connect(database=qajoke_db, timeout=2) as db_conn: db_query: str = "SELECT question, answer FROM jokes ORDER BY RANDOM() LIMIT 1" async with await db_conn.execute(db_query) as cursor: (question, answer) = await cursor.fetchone() return (question, answer) return None async def get_rjoke(self) -> Optional[tuple]: """ Get r/joke Joke Returns: Optional[tuple] """ rjokes_db: str|LiteralString = self.dbs.get('rjokes', '') if not rjokes_db: return None async with sqlite3.connect(database=rjokes_db, timeout=2) as db_conn: db_query: str = "SELECT title, body, score FROM jokes WHERE score >= 100 ORDER BY RANDOM() LIMIT 1'" async with await db_conn.execute(db_query) as cursor: (title, body, score) = await cursor.fetchone() return (title, body, score) return None async def get_random_fact(self) -> str: """ Get Random Fact Returns: str """ try: facts_api_url: str = "https://uselessfacts.jsph.pl/api/v2/facts/random" facts_backup_url: str = "https://cnichols1734.pythonanywhere.com/facts/random" async with ClientSession() as client: try: async with await client.get(facts_api_url, timeout=ClientTimeout(connect=5, sock_read=5)) as request: _json: dict = await request.json() fact: str = _json.get('text', None) if not fact: raise BaseException("RandFact Src 1 Failed") return fact except: async with await client.get(facts_backup_url, timeout=ClientTimeout(connect=5, sock_read=5)) as request: _json = await request.json() fact = _json.get('fact', None) return fact except Exception as e: traceback.print_exc() return f"Failed to get a random fact :( [{str(e)}]" async def get_cookie(self) -> Optional[dict]: """ Get Cookie Returns: Optional[dict] """ cookies_db = self.dbs.get('cookies', '') if not cookies_db: return None async with sqlite3.connect(cookies_db, timeout=2) as db_conn: db_query: str = "SELECT name, origin, image_url FROM cookies ORDER BY RANDOM() LIMIT 1" async with await db_conn.execute(db_query) as db_cursor: (name, origin, image_url) = await db_cursor.fetchone() return { 'name': name, 'origin': origin, 'image_url': image_url } def get_coffee(self, recipient_allergic: Optional[bool] = False) -> Optional[str]: """ Get Coffee Args: recipient_allergic (bool): Is the recipient allergic? (so we know when to keep our nuts out of it) Returns: str """ try: randomCoffee: str = random.choice(self.COFFEES) if self.LAST_5_COFFEES and randomCoffee in self.LAST_5_COFFEES\ or (recipient_allergic and "nut" in randomCoffee.lower()): return self.get_coffee() # Recurse if len(self.LAST_5_COFFEES) >= 5: self.LAST_5_COFFEES.pop() # Store no more than 5 of the last served coffees self.LAST_5_COFFEES.append(randomCoffee) return randomCoffee except: traceback.print_exc() return None def get_days_to_xmas(self) -> Optional[tuple]: """ Get # of Days until Xmas Returns: Optional[tuple] """ today: datetime.datetime = datetime.datetime.now(tz=pytz.UTC) xmas: datetime.datetime = datetime.datetime( year=today.year, month=12, day=25, tzinfo=pytz.UTC, ) td: datetime.timedelta = (xmas - today) days, hours, minutes, seconds, us, ms = self.tdTuple(td) return (days, hours, minutes, seconds, ms, us) async def get_randmsg(self) -> Optional[str]: """ Get Random Message from randmsg.db Returns: Optional[str] """ randmsg_db: str|LiteralString = self.dbs.get('randmsg', '') if not randmsg_db: return None async with sqlite3.connect(database=randmsg_db, timeout=2) as db_conn: db_query: str = "SELECT msg FROM msgs ORDER BY RANDOM() LIMIT 1" async with await db_conn.execute(db_query) as db_cursor: (result,) = await db_cursor.fetchone() return result