#!/usr/bin/env python3.12 import os import logging import traceback import random import datetime import pytz from typing import Optional, LiteralString import regex import aiosqlite as sqlite3 from aiohttp import ClientSession, ClientTimeout from bohancompliment import ComplimentGenerator from discord import Embed class Util: """Misc Utility""" def __init__(self) -> None: self.URL_URBANDICTIONARY: str = "http://api.urbandictionary.com/v0/define" self.URL_INSULTAPI: str = "https://insult.mattbas.org/api/insult" self.COMPLIMENT_GENERATOR = ComplimentGenerator() self.dbs: dict[str|LiteralString] = { 'whisky': os.path.join("/usr/local/share", "sqlite_dbs", "whiskey.db"), 'drinks': os.path.join("/usr/local/share", "sqlite_dbs", "cocktails.db"), 'strains': os.path.join("/usr/local/share", "sqlite_dbs", "strains.db"), 'qajoke': os.path.join("/usr/local/share", "sqlite_dbs", "qajoke.db"), 'rjokes': os.path.join("/usr/local/share", "sqlite_dbs", "rjokes.db"), 'randmsg': os.path.join("/usr/local/share", "sqlite_dbs", "randmsg.db"), 'stats': os.path.join("/usr/local/share", "sqlite_dbs", "havoc_stats.db"), 'cookies': os.path.join("/usr/local/share", "sqlite_dbs", "cookies.db"), } self.COFFEES: list = ['a cup of french-pressed coffee', 'a cup of cold brew', 'a cup of flash brew', 'a cup of Turkish coffee', 'a cup of Moka', 'an espresso', 'a cup of Nescafe coffee', 'an iced coffee', 'a Frappé', 'a freddo cappuccino', 'a cup of Chock full o\'Nuts', 'a cup of Folgers', 'a cup of Lavazza', 'a cup of Maxwell House', 'a cup of Moccona', 'a cup of Mr. Brown Coffee', 'a cup of affogato al caffè', 'a cup of Caffè Medici', 'a cup of Café Touba', 'a double-double', 'an indian filter coffee', 'a cup of pocillo', 'a cup of caffè americano', 'a cup of caffè lungo', 'a latte', 'a manilo', 'a flat white', 'a cup of café cubano', 'a cup of caffè crema', 'a cup of cafe zorro', 'an espresso roberto', 'an espresso romano', 'an espresso sara', 'a guillermo', 'a ristretto', 'a cup of melya', 'a cup of caffè marocchino', 'a cup of café miel', 'a cup of café de olla', 'a Mazagran', 'a Palazzo', 'an ice shot'] self.LAST_5_COFFEES: list = [] def tdTuple(self, td:datetime.timedelta) -> tuple: """ Create TimeDelta Tuple Args: td (datetime.timedelta) Returns: tuple """ def _t(t, n): if t < n: return (t, 0) v = t//n return (t - (v * n), v) (s, h) = _t(td.seconds, 3600) (s, m) = _t(s, 60) (mics, mils) = _t(td.microseconds, 1000) return (td.days, h, m, s, mics, mils) def sqlite_dict_factory(self, cursor: sqlite3.Cursor, row: sqlite3.Row) -> dict: """ SQLite Dict Factory for Rows Returned Args: cursor (sqlite3.Row) row (sqlite3.Row) Returns: dict """ fields = [column[0] for column in cursor.description] return { key: value for key, value in zip(fields, row) } async def get_counter(self, counter: Optional[str] = None) -> dict: """ Get Counter Args: counter (Optional[str]) Returns: dict """ async with sqlite3.connect(self.dbs.get('stats'), timeout=3) as db_conn: db_conn.row_factory = self.sqlite_dict_factory query: str = "SELECT ? FROM stats LIMIT 1" if not counter: query: str = "SELECT * FROM stats LIMIT 1" async with await db_conn.execute(query, (counter,) if counter else None) as db_cursor: result: dict = await db_cursor.fetchone() return result async def get_stats_embed(self) -> Embed: """ Get Stats Embed Returns: Embed """ counters: dict = await self.get_counter() embed: Embed = Embed(title="Stats") counter_message: str = "" counters_sorted: dict = dict(sorted(counters.items(), key=lambda item: item[1], reverse=True)) for counter, value in counters_sorted.items(): counter: str = regex.sub(r'_', ' ', counter.strip()).title() counter_message += f"- {value} {counter}\n" embed.description = counter_message.strip() return embed async def increment_counter(self, counter: str) -> bool: """ Increment Counter Args: counter (str) Returns: bool """ async with sqlite3.connect(self.dbs.get('stats'), timeout=3) as db_conn: async with await db_conn.execute(f"UPDATE stats SET {counter} = {counter} + 1") as db_cursor: if db_cursor.rowcount < 0: logging.critical("[karma::increment_counter] Fail! %s", db_cursor.rowcount) return False await db_conn.commit() return True async def get_ud_def(self, term: Optional[str] = None) -> tuple[str, str]: """ Get Definition from UD Args: term (Optional[str]) Returns: tuple[str, str] """ try: async with ClientSession() as session: async with await session.get(self.URL_URBANDICTIONARY, params={ "term": term, }, headers = { 'content-type': 'application/json; charset=utf-8', }, timeout=ClientTimeout(connect=5, sock_read=5)) as request: logging.debug("UD returned: %s", await request.text()) data: dict = await request.json() if "list" in data: definitions: list[dict] = data["list"] if definitions: definition: dict = definitions[0] definition_word: str = definition.get("word", "N/A") definition_text: str = regex.sub(r'(\r|\n|\r\n)', ' ', definition["definition"].strip()) return (definition_word, definition_text) # Tuple: Returned word, returned definition else: return (term, "Not found!") else: return (term, "Error retrieving data from Urban Dictionary") except Exception as e: traceback.print_exc() return (term, f"ERR: {str(e)}") async def get_insult(self, recipient: str) -> str: """ Get Insult Args: recipient (str) Returns: str """ async with ClientSession() as session: async with await session.get(f"{self.URL_INSULTAPI}?who={recipient}") as request: request.raise_for_status() return await request.text() async def get_compliment(self, subject: str, language: Optional[str] = None) -> str: """ Get Compliment Args: subject (str) language (Optional[str]) Returns: str """ if not language: return self.COMPLIMENT_GENERATOR.compliment(subject) return self.COMPLIMENT_GENERATOR.compliment_in_language(subject, language) async def get_whisky(self) -> tuple: """ Get Whisky Returns: tuple """ whisky_db: str|LiteralString = self.dbs.get('whisky') db_conn = await sqlite3.connect(database=whisky_db, timeout=2) db_query: str = "SELECT name, category, description FROM whiskeys ORDER BY random() LIMIT 1" db_cursor: sqlite3.Cursor = await db_conn.execute(db_query) db_result: tuple = await db_cursor.fetchone() (name, category, description) = db_result name: str = regex.sub(r'(^\p{White_Space}|\r|\n)', '', regex.sub(r'\p{White_Space}{2,}', ' ', name.strip())) category: str = regex.sub(r'(^\p{White_Space}|\r|\n)', '', regex.sub(r'\p{White_Space}{2,}', ' ', category.strip())) description: str = regex.sub(r'(^\p{White_Space}|\r|\n)', '', regex.sub(r'\p{White_Space}{2,}', ' ', description.strip())) return (name, category, description) async def get_drink(self) -> tuple: """ Get Drink Returns: tuple """ drinks_db: str|LiteralString = self.dbs.get('drinks') db_conn = await sqlite3.connect(database=drinks_db, timeout=2) db_query: str = "SELECT name, ingredients FROM cocktails ORDER BY random() LIMIT 1" db_cursor: sqlite3.Cursor = await db_conn.execute(db_query) db_result: tuple = await db_cursor.fetchone() (name, ingredients) = db_result name = regex.sub(r'(^\p{White_Space}|\r|\n)', '', regex.sub(r'\p{White_Space}{2,}', ' ', name.strip())) ingredients = regex.sub(r'(^\p{White_Space}|\r|\n)', '', regex.sub(r'\p{White_Space}{2,}', ' ', ingredients.strip())) ingredients = regex.sub(r'\*', '\u2731', ingredients.strip()) return (name, ingredients) async def get_strain(self, strain: Optional[str] = None) -> tuple: """ Get Strain Args: strain (Optional[str]) Returns: tuple """ strains_db: str|LiteralString = self.dbs.get('strains') db_conn = await sqlite3.connect(database=strains_db, timeout=2) db_params: Optional[tuple] = None if not strain: db_query: str = "SELECT name, description FROM strains_w_desc ORDER BY random() LIMIT 1" else: db_query: str = "SELECT name, description FROM strains_w_desc WHERE name LIKE ?" db_params: tuple = (f"%{strain.strip()}%",) db_cursor: sqlite3.Cursor = await db_conn.execute(db_query, db_params) db_result: tuple = await db_cursor.fetchone() return db_result async def get_qajoke(self) -> tuple: """ Get QA Joke Returns: tuple """ qajoke_db: str|LiteralString = self.dbs.get('qajoke') async with sqlite3.connect(database=qajoke_db, timeout=2) as db: async with await db.execute('SELECT question, answer FROM jokes ORDER BY RANDOM() LIMIT 1') as cursor: (question, answer) = await cursor.fetchone() return (question, answer) return None async def get_rjoke(self) -> tuple: """ Get r/joke Joke Returns: tuple """ rjokes_db: str|LiteralString = self.dbs.get('rjokes') async with sqlite3.connect(database=rjokes_db, timeout=2) as db: async with await db.execute('SELECT title, body, score FROM jokes WHERE score >= 100 ORDER BY RANDOM() LIMIT 1') as cursor: (title, body, score) = await cursor.fetchone() return (title, body, score) return None async def get_random_fact(self) -> str: """ Get Random Fact Returns: str """ try: facts_api_url: str = "https://uselessfacts.jsph.pl/api/v2/facts/random" facts_backup_url: str = "https://cnichols1734.pythonanywhere.com/facts/random" async with ClientSession() as client: try: async with await client.get(facts_api_url, timeout=ClientTimeout(connect=5, sock_read=5)) as request: json: dict = await request.json() fact: str = json.get('text') if not fact: raise BaseException("RandFact Src 1 Failed") return fact except: async with await client.get(facts_backup_url, timeout=ClientTimeout(connect=5, sock_read=5)) as request: json: dict = await request.json() fact: str = json.get('fact') return fact except Exception as e: traceback.print_exc() return f"Failed to get a random fact :( [{str(e)}]" async def get_cookie(self) -> dict: """ Get Cookie Returns: dict """ async with sqlite3.connect(self.dbs.get('cookies'), timeout=2) as db_conn: async with await db_conn.execute("SELECT name, origin, image_url FROM cookies ORDER BY RANDOM() LIMIT 1") as db_cursor: (name, origin, image_url) = await db_cursor.fetchone() return { 'name': name, 'origin': origin, 'image_url': image_url } def get_coffee(self) -> str: """ Get Coffee Returns: str """ try: randomCoffee: str = random.choice(self.COFFEES) if self.LAST_5_COFFEES and randomCoffee in self.LAST_5_COFFEES: return self.get_coffee() # Recurse if len(self.LAST_5_COFFEES) >= 5: self.LAST_5_COFFEES.pop() # Store no more than 5 of the last served coffees self.LAST_5_COFFEES.append(randomCoffee) return randomCoffee except: traceback.print_exc() return False def get_days_to_xmas(self) -> tuple[int|float]: """ Get # of Days until Xmas Returns: tuple[int|float] """ today: datetime = datetime.datetime.now(tz=pytz.UTC) xmas: datetime = datetime.datetime( year=today.year, month=12, day=25, tzinfo=pytz.UTC, ) td: datetime.timedelta = (xmas - today) # pylint: disable=superfluous-parens days, hours, minutes, seconds, us, ms = self.tdTuple(td) return (days, hours, minutes, seconds, ms, us) async def get_randmsg(self) -> str: """ Get Random Message from randmsg.db Returns: str """ randmsg_db = self.dbs.get('randmsg') async with sqlite3.connect(database=randmsg_db, timeout=2) as db_conn: async with await db_conn.execute("SELECT msg FROM msgs ORDER BY RANDOM() LIMIT 1") as db_cursor: (result,) = await db_cursor.fetchone() return result