546 lines
18 KiB
Python
546 lines
18 KiB
Python
import os
|
|
import logging
|
|
import traceback
|
|
import random
|
|
import datetime
|
|
import pytz
|
|
from typing import Optional, LiteralString, Union
|
|
import regex
|
|
import aiosqlite as sqlite3
|
|
from aiohttp import ClientSession, ClientTimeout
|
|
from bohancompliment import ComplimentGenerator
|
|
from discord import Embed
|
|
|
|
|
|
class Util:
|
|
"""Misc Utility"""
|
|
|
|
def __init__(self) -> None:
|
|
self.URL_URBANDICTIONARY: str = "http://api.urbandictionary.com/v0/define"
|
|
self.URL_INSULTAPI: str = "https://insult.mattbas.org/api/insult"
|
|
self.COMPLIMENT_GENERATOR = ComplimentGenerator()
|
|
self.dbs: dict[str, str | LiteralString] = {
|
|
"whisky": os.path.join("/usr/local/share", "sqlite_dbs", "whiskey.db"),
|
|
"drinks": os.path.join("/usr/local/share", "sqlite_dbs", "cocktails.db"),
|
|
"strains": os.path.join("/usr/local/share", "sqlite_dbs", "strains.db"),
|
|
"qajoke": os.path.join("/usr/local/share", "sqlite_dbs", "qajoke.db"),
|
|
"rjokes": os.path.join("/usr/local/share", "sqlite_dbs", "rjokes.db"),
|
|
"randmsg": os.path.join("/usr/local/share", "sqlite_dbs", "randmsg.db"),
|
|
"stats": os.path.join("/usr/local/share", "sqlite_dbs", "havoc_stats.db"),
|
|
"cookies": os.path.join("/usr/local/share", "sqlite_dbs", "cookies.db"),
|
|
}
|
|
self.COFFEES: list = [
|
|
"a cup of french-pressed coffee",
|
|
"a cup of cold brew",
|
|
"a cup of flash brew",
|
|
"a cup of Turkish coffee",
|
|
"a cup of Moka",
|
|
"an espresso",
|
|
"a cup of Nescafe coffee",
|
|
"an iced coffee",
|
|
"a Frappé",
|
|
"a freddo cappuccino",
|
|
"a cup of Chock full o'Nuts",
|
|
"a cup of Folgers",
|
|
"a cup of Lavazza",
|
|
"a cup of Maxwell House",
|
|
"a cup of Moccona",
|
|
"a cup of Mr. Brown Coffee",
|
|
"a cup of affogato al caffè",
|
|
"a cup of Caffè Medici",
|
|
"a cup of Café Touba",
|
|
"a double-double",
|
|
"an indian filter coffee",
|
|
"a cup of pocillo",
|
|
"a cup of caffè americano",
|
|
"a cup of caffè lungo",
|
|
"a latte",
|
|
"a manilo",
|
|
"a flat white",
|
|
"a cup of café cubano",
|
|
"a cup of caffè crema",
|
|
"a cup of cafe zorro",
|
|
"an espresso roberto",
|
|
"an espresso romano",
|
|
"an espresso sara",
|
|
"a guillermo",
|
|
"a ristretto",
|
|
"a cup of melya",
|
|
"a cup of caffè marocchino",
|
|
"a cup of café miel",
|
|
"a cup of café de olla",
|
|
"a Mazagran",
|
|
"a Palazzo",
|
|
"an ice shot",
|
|
"a macchiato",
|
|
"a cortado",
|
|
"a red eye",
|
|
"a cappuccino",
|
|
"a mocha",
|
|
"a café au lait",
|
|
"a bicerin",
|
|
"a caffè corretto",
|
|
"a ca phe trung",
|
|
"a café bombón",
|
|
"a Vienna coffee",
|
|
"a flat black",
|
|
"a lungo",
|
|
"a doppio",
|
|
"a ristretto bianco",
|
|
"a piccolo latte",
|
|
"a gibraltar",
|
|
"a breve",
|
|
"a café con leche",
|
|
"a su café",
|
|
"a café del tiempo",
|
|
"a java chip frappuccino",
|
|
"a pumpkin spice latte",
|
|
"a caramel macchiato",
|
|
"a white chocolate mocha",
|
|
"a hazelnut coffee",
|
|
"a toffee nut latte",
|
|
"a peppermint mocha",
|
|
"a cinnamon dolce latte",
|
|
"a coconut milk latte",
|
|
"an almond milk cappuccino",
|
|
"an oat milk latte",
|
|
"a caramel frappuccino",
|
|
"a chocolate frappuccino",
|
|
"a butter pecan coffee",
|
|
"a maple pecan latte",
|
|
"a sea salt caramel mocha",
|
|
"a nitro cold brew",
|
|
"a pumpkin cold brew",
|
|
"a honey almond flat white",
|
|
"a sweet cream cold brew",
|
|
"a matcha latte",
|
|
"a golden latte",
|
|
"a turmeric latte",
|
|
"a beetroot latte",
|
|
"a kopi luwak",
|
|
]
|
|
self.LAST_5_COFFEES: list = []
|
|
|
|
def tdTuple(self, td: datetime.timedelta) -> tuple:
|
|
"""
|
|
Create TimeDelta Tuple
|
|
|
|
Args:
|
|
td (datetime.timedelta)
|
|
Returns:
|
|
tuple
|
|
|
|
"""
|
|
|
|
def _t(t, n):
|
|
if t < n:
|
|
return (t, 0)
|
|
v = t // n
|
|
return (t - (v * n), v)
|
|
|
|
(s, h) = _t(td.seconds, 3600)
|
|
(s, m) = _t(s, 60)
|
|
(mics, mils) = _t(td.microseconds, 1000)
|
|
return (td.days, h, m, s, mics, mils)
|
|
|
|
async def get_counter(self, counter: Optional[str] = None) -> Optional[dict]:
|
|
"""
|
|
Get Counter
|
|
|
|
Args:
|
|
counter (Optional[str])
|
|
Returns:
|
|
Optional[dict]
|
|
|
|
"""
|
|
stats_db: str | LiteralString = self.dbs.get("stats", "")
|
|
if not stats_db:
|
|
return None
|
|
async with sqlite3.connect(stats_db, timeout=3) as db_conn:
|
|
db_conn.row_factory = sqlite3.Row
|
|
query: str = "SELECT ? FROM stats LIMIT 1"
|
|
if not counter:
|
|
query = "SELECT * FROM stats LIMIT 1"
|
|
async with await db_conn.execute(
|
|
query, (counter,) if counter else None
|
|
) as db_cursor:
|
|
result = await db_cursor.fetchone()
|
|
return result
|
|
|
|
async def get_stats_embed(self) -> Optional[Embed]:
|
|
"""
|
|
Get Stats Embed
|
|
|
|
Returns:
|
|
Optional[Embed]
|
|
|
|
"""
|
|
counters: Optional[dict] = await self.get_counter()
|
|
if not counters:
|
|
return None
|
|
embed: Embed = Embed(title="Stats")
|
|
counter_message: str = ""
|
|
counters_sorted: dict = dict(
|
|
sorted(counters.items(), key=lambda item: item[1], reverse=True)
|
|
)
|
|
for counter, value in counters_sorted.items():
|
|
counter = regex.sub(r"_", " ", counter.strip()).title()
|
|
counter_message += f"- {value} {counter}\n"
|
|
embed.description = counter_message.strip()
|
|
return embed
|
|
|
|
async def increment_counter(self, counter: str) -> bool:
|
|
"""
|
|
Increment Counter
|
|
|
|
Args:
|
|
counter (str)
|
|
Returns:
|
|
bool
|
|
|
|
"""
|
|
stats_db: str | LiteralString = self.dbs.get("stats", "")
|
|
if not stats_db:
|
|
return False
|
|
async with sqlite3.connect(stats_db, timeout=3) as db_conn:
|
|
async with await db_conn.execute(
|
|
f"UPDATE stats SET {counter} = {counter} + 1"
|
|
) as db_cursor:
|
|
if db_cursor.rowcount < 0:
|
|
logging.critical(
|
|
"[karma::increment_counter] Fail! %s", db_cursor.rowcount
|
|
)
|
|
return False
|
|
await db_conn.commit()
|
|
return True
|
|
|
|
async def get_ud_def(self, term: str) -> tuple[str, str]:
|
|
"""
|
|
Get Definition from UD
|
|
|
|
Args:
|
|
term (str)
|
|
Returns:
|
|
tuple[str, str]
|
|
|
|
"""
|
|
try:
|
|
async with ClientSession() as session:
|
|
async with await session.get(
|
|
self.URL_URBANDICTIONARY,
|
|
params={
|
|
"term": term,
|
|
},
|
|
headers={
|
|
"content-type": "application/json; charset=utf-8",
|
|
},
|
|
timeout=ClientTimeout(connect=5, sock_read=5),
|
|
) as request:
|
|
logging.debug("UD returned: %s", await request.text())
|
|
data: dict = await request.json()
|
|
if "list" in data:
|
|
definitions: list[dict] = data["list"]
|
|
if definitions:
|
|
definition: dict = definitions[0]
|
|
definition_word: str = definition.get("word", "N/A")
|
|
definition_text: str = regex.sub(
|
|
r"(\r|\n|\r\n)", " ", definition["definition"].strip()
|
|
)
|
|
return (
|
|
definition_word,
|
|
definition_text,
|
|
) # Tuple: Returned word, returned definition
|
|
else:
|
|
return (term, "Not found!")
|
|
else:
|
|
return (term, "Error retrieving data from Urban Dictionary")
|
|
except Exception as e:
|
|
traceback.print_exc()
|
|
return (term, f"ERR: {str(e)}")
|
|
|
|
async def get_insult(self, recipient: str) -> str:
|
|
"""
|
|
Get Insult
|
|
|
|
Args:
|
|
recipient (str)
|
|
Returns:
|
|
str
|
|
|
|
"""
|
|
async with ClientSession() as session:
|
|
async with await session.get(
|
|
f"{self.URL_INSULTAPI}?who={recipient}"
|
|
) as request:
|
|
request.raise_for_status()
|
|
return await request.text()
|
|
|
|
async def get_compliment(self, subject: str, language: Optional[str] = "en") -> str:
|
|
"""
|
|
Get Compliment
|
|
|
|
Args:
|
|
subject (str)
|
|
language (Optional[str]) (default: 'en')
|
|
Returns:
|
|
str
|
|
|
|
"""
|
|
return self.COMPLIMENT_GENERATOR.compliment_in_language(subject, language)
|
|
|
|
async def get_whisky(self) -> Optional[tuple]:
|
|
"""
|
|
Get Whisky
|
|
|
|
Returns:
|
|
Optional[tuple]
|
|
|
|
"""
|
|
whisky_db: str | LiteralString = self.dbs.get("whisky", "")
|
|
if not whisky_db:
|
|
return None
|
|
async with sqlite3.connect(database=whisky_db, timeout=2) as db_conn:
|
|
db_query: str = (
|
|
"SELECT name, category, description FROM whiskeys ORDER BY random() LIMIT 1"
|
|
)
|
|
async with await db_conn.execute(db_query) as db_cursor:
|
|
db_result: Optional[Union[sqlite3.Row, tuple]] = (
|
|
await db_cursor.fetchone()
|
|
)
|
|
if not db_result:
|
|
return None
|
|
(name, category, description) = db_result
|
|
name = regex.sub(
|
|
r"(^\p{White_Space}|\r|\n)",
|
|
"",
|
|
regex.sub(r"\p{White_Space}{2,}", " ", name.strip()),
|
|
)
|
|
category = regex.sub(
|
|
r"(^\p{White_Space}|\r|\n)",
|
|
"",
|
|
regex.sub(r"\p{White_Space}{2,}", " ", category.strip()),
|
|
)
|
|
description = regex.sub(
|
|
r"(^\p{White_Space}|\r|\n)",
|
|
"",
|
|
regex.sub(r"\p{White_Space}{2,}", " ", description.strip()),
|
|
)
|
|
return (name, category, description)
|
|
|
|
async def get_drink(self) -> Optional[tuple]:
|
|
"""
|
|
Get Drink
|
|
|
|
Returns:
|
|
Optional[tuple]
|
|
|
|
"""
|
|
drinks_db: str | LiteralString = self.dbs.get("drinks", "")
|
|
if not drinks_db:
|
|
return None
|
|
async with sqlite3.connect(database=drinks_db, timeout=2) as db_conn:
|
|
db_query: str = (
|
|
"SELECT name, ingredients FROM cocktails ORDER BY random() LIMIT 1"
|
|
)
|
|
async with await db_conn.execute(db_query) as db_cursor:
|
|
db_result: tuple = await db_cursor.fetchone()
|
|
|
|
(name, ingredients) = db_result
|
|
name = regex.sub(
|
|
r"(^\p{White_Space}|\r|\n)",
|
|
"",
|
|
regex.sub(r"\p{White_Space}{2,}", " ", name.strip()),
|
|
)
|
|
ingredients = regex.sub(
|
|
r"(^\p{White_Space}|\r|\n)",
|
|
"",
|
|
regex.sub(r"\p{White_Space}{2,}", " ", ingredients.strip()),
|
|
)
|
|
ingredients = regex.sub(r"\*", "\u2731", ingredients.strip())
|
|
return (name, ingredients)
|
|
|
|
async def get_strain(self, strain: Optional[str] = None) -> Optional[tuple]:
|
|
"""
|
|
Get Strain
|
|
|
|
Args:
|
|
strain (Optional[str])
|
|
Returns:
|
|
Optional[tuple]
|
|
|
|
"""
|
|
strains_db: str | LiteralString = self.dbs.get("strains", "")
|
|
if not strains_db:
|
|
return None
|
|
async with sqlite3.connect(database=strains_db, timeout=2) as db_conn:
|
|
db_params: Optional[tuple] = None
|
|
if not strain:
|
|
db_query: str = (
|
|
"SELECT name, description FROM strains_w_desc ORDER BY random() LIMIT 1"
|
|
)
|
|
else:
|
|
db_query = (
|
|
"SELECT name, description FROM strains_w_desc WHERE name LIKE ?"
|
|
)
|
|
db_params = (f"%{strain.strip()}%",)
|
|
async with await db_conn.execute(db_query, db_params) as db_cursor:
|
|
db_result: Optional[tuple] = await db_cursor.fetchone()
|
|
return db_result
|
|
|
|
async def get_qajoke(self) -> Optional[tuple]:
|
|
"""
|
|
Get QA Joke
|
|
|
|
Returns:
|
|
Optional[tuple]
|
|
|
|
"""
|
|
qajoke_db: str | LiteralString = self.dbs.get("qajoke", "")
|
|
if not qajoke_db:
|
|
return None
|
|
async with sqlite3.connect(database=qajoke_db, timeout=2) as db_conn:
|
|
db_query: str = (
|
|
"SELECT question, answer FROM jokes ORDER BY RANDOM() LIMIT 1"
|
|
)
|
|
async with await db_conn.execute(db_query) as cursor:
|
|
(question, answer) = await cursor.fetchone()
|
|
return (question, answer)
|
|
return None
|
|
|
|
async def get_rjoke(self) -> Optional[tuple]:
|
|
"""
|
|
Get r/joke Joke
|
|
|
|
Returns:
|
|
Optional[tuple]
|
|
|
|
"""
|
|
rjokes_db: str | LiteralString = self.dbs.get("rjokes", "")
|
|
if not rjokes_db:
|
|
return None
|
|
async with sqlite3.connect(database=rjokes_db, timeout=2) as db_conn:
|
|
db_query: str = (
|
|
"SELECT title, body, score FROM jokes WHERE score >= 100 ORDER BY RANDOM() LIMIT 1'"
|
|
)
|
|
async with await db_conn.execute(db_query) as cursor:
|
|
(title, body, score) = await cursor.fetchone()
|
|
return (title, body, score)
|
|
return None
|
|
|
|
async def get_random_fact(self) -> str:
|
|
"""
|
|
Get Random Fact
|
|
|
|
Returns:
|
|
str
|
|
|
|
"""
|
|
try:
|
|
facts_api_url: str = "https://uselessfacts.jsph.pl/api/v2/facts/random"
|
|
facts_backup_url: str = (
|
|
"https://cnichols1734.pythonanywhere.com/facts/random"
|
|
)
|
|
async with ClientSession() as client:
|
|
try:
|
|
async with await client.get(
|
|
facts_api_url, timeout=ClientTimeout(connect=5, sock_read=5)
|
|
) as request:
|
|
_json: dict = await request.json()
|
|
fact: str = _json.get("text", None)
|
|
if not fact:
|
|
raise BaseException("RandFact Src 1 Failed")
|
|
return fact
|
|
except:
|
|
async with await client.get(
|
|
facts_backup_url, timeout=ClientTimeout(connect=5, sock_read=5)
|
|
) as request:
|
|
_json = await request.json()
|
|
fact = _json.get("fact", None)
|
|
return fact
|
|
except Exception as e:
|
|
traceback.print_exc()
|
|
return f"Failed to get a random fact :( [{str(e)}]"
|
|
|
|
async def get_cookie(self) -> Optional[dict]:
|
|
"""
|
|
Get Cookie
|
|
|
|
Returns:
|
|
Optional[dict]
|
|
|
|
"""
|
|
cookies_db = self.dbs.get("cookies", "")
|
|
if not cookies_db:
|
|
return None
|
|
async with sqlite3.connect(cookies_db, timeout=2) as db_conn:
|
|
db_query: str = (
|
|
"SELECT name, origin, image_url FROM cookies ORDER BY RANDOM() LIMIT 1"
|
|
)
|
|
async with await db_conn.execute(db_query) as db_cursor:
|
|
(name, origin, image_url) = await db_cursor.fetchone()
|
|
return {"name": name, "origin": origin, "image_url": image_url}
|
|
|
|
def get_coffee(self, recipient_allergic: Optional[bool] = False) -> Optional[str]:
|
|
"""
|
|
Get Coffee
|
|
|
|
Args:
|
|
recipient_allergic (bool): Is the recipient allergic? (so we know when to keep our nuts out of it)
|
|
|
|
Returns:
|
|
str
|
|
|
|
"""
|
|
try:
|
|
randomCoffee: str = random.choice(self.COFFEES)
|
|
if (
|
|
self.LAST_5_COFFEES
|
|
and randomCoffee in self.LAST_5_COFFEES
|
|
or (recipient_allergic and "nut" in randomCoffee.lower())
|
|
):
|
|
return self.get_coffee() # Recurse
|
|
if len(self.LAST_5_COFFEES) >= 5:
|
|
self.LAST_5_COFFEES.pop() # Store no more than 5 of the last served coffees
|
|
self.LAST_5_COFFEES.append(randomCoffee)
|
|
return randomCoffee
|
|
except:
|
|
traceback.print_exc()
|
|
return None
|
|
|
|
def get_days_to_xmas(self) -> Optional[tuple]:
|
|
"""
|
|
Get # of Days until Xmas
|
|
|
|
Returns:
|
|
Optional[tuple]
|
|
|
|
"""
|
|
today: datetime.datetime = datetime.datetime.now(tz=pytz.UTC)
|
|
xmas: datetime.datetime = datetime.datetime(
|
|
year=today.year,
|
|
month=12,
|
|
day=25,
|
|
tzinfo=pytz.UTC,
|
|
)
|
|
td: datetime.timedelta = xmas - today
|
|
days, hours, minutes, seconds, us, ms = self.tdTuple(td)
|
|
|
|
return (days, hours, minutes, seconds, ms, us)
|
|
|
|
async def get_randmsg(self) -> Optional[str]:
|
|
"""
|
|
Get Random Message from randmsg.db
|
|
|
|
Returns:
|
|
Optional[str]
|
|
|
|
"""
|
|
randmsg_db: str | LiteralString = self.dbs.get("randmsg", "")
|
|
if not randmsg_db:
|
|
return None
|
|
async with sqlite3.connect(database=randmsg_db, timeout=2) as db_conn:
|
|
db_query: str = "SELECT msg FROM msgs ORDER BY RANDOM() LIMIT 1"
|
|
async with await db_conn.execute(db_query) as db_cursor:
|
|
(result,) = await db_cursor.fetchone()
|
|
return result
|