cleanup (moved misc_utils to utils), changed sing footer.
This commit is contained in:
453
util/misc_util.py
Normal file
453
util/misc_util.py
Normal file
@ -0,0 +1,453 @@
|
||||
import os
|
||||
import logging
|
||||
import traceback
|
||||
import random
|
||||
import datetime
|
||||
import pytz
|
||||
from typing import Optional, LiteralString, Union
|
||||
import regex
|
||||
import aiosqlite as sqlite3
|
||||
from aiohttp import ClientSession, ClientTimeout
|
||||
from bohancompliment import ComplimentGenerator
|
||||
from discord import Embed
|
||||
|
||||
class Util:
|
||||
"""Misc Utility"""
|
||||
def __init__(self) -> None:
|
||||
self.URL_URBANDICTIONARY: str = "http://api.urbandictionary.com/v0/define"
|
||||
self.URL_INSULTAPI: str = "https://insult.mattbas.org/api/insult"
|
||||
self.COMPLIMENT_GENERATOR = ComplimentGenerator()
|
||||
self.dbs: dict[str, str|LiteralString] = {
|
||||
'whisky': os.path.join("/usr/local/share",
|
||||
"sqlite_dbs", "whiskey.db"),
|
||||
'drinks': os.path.join("/usr/local/share",
|
||||
"sqlite_dbs", "cocktails.db"),
|
||||
'strains': os.path.join("/usr/local/share",
|
||||
"sqlite_dbs", "strains.db"),
|
||||
'qajoke': os.path.join("/usr/local/share",
|
||||
"sqlite_dbs", "qajoke.db"),
|
||||
'rjokes': os.path.join("/usr/local/share",
|
||||
"sqlite_dbs", "rjokes.db"),
|
||||
'randmsg': os.path.join("/usr/local/share",
|
||||
"sqlite_dbs", "randmsg.db"),
|
||||
'stats': os.path.join("/usr/local/share",
|
||||
"sqlite_dbs", "havoc_stats.db"),
|
||||
'cookies': os.path.join("/usr/local/share",
|
||||
"sqlite_dbs", "cookies.db"),
|
||||
}
|
||||
self.COFFEES: list = ['a cup of french-pressed coffee', 'a cup of cold brew', 'a cup of flash brew',
|
||||
'a cup of Turkish coffee', 'a cup of Moka', 'an espresso',
|
||||
'a cup of Nescafe coffee',
|
||||
'an iced coffee', 'a Frappé', 'a freddo cappuccino',
|
||||
'a cup of Chock full o\'Nuts', 'a cup of Folgers', 'a cup of Lavazza',
|
||||
'a cup of Maxwell House', 'a cup of Moccona', 'a cup of Mr. Brown Coffee',
|
||||
'a cup of affogato al caffè',
|
||||
'a cup of Caffè Medici', 'a cup of Café Touba',
|
||||
'a double-double', 'an indian filter coffee', 'a cup of pocillo',
|
||||
'a cup of caffè americano', 'a cup of caffè lungo', 'a latte', 'a manilo',
|
||||
'a flat white', 'a cup of café cubano', 'a cup of caffè crema',
|
||||
'a cup of cafe zorro', 'an espresso roberto', 'an espresso romano',
|
||||
'an espresso sara', 'a guillermo', 'a ristretto', 'a cup of melya',
|
||||
'a cup of caffè marocchino', 'a cup of café miel', 'a cup of café de olla',
|
||||
'a Mazagran', 'a Palazzo', 'an ice shot', 'a macchiato',
|
||||
'a cortado', 'a red eye', 'a cappuccino',
|
||||
'a mocha', 'a café au lait', 'a bicerin',
|
||||
'a caffè corretto', 'a ca phe trung', 'a café bombón', 'a Vienna coffee',
|
||||
'a flat black', 'a lungo', 'a doppio', 'a ristretto bianco', 'a piccolo latte',
|
||||
'a gibraltar', 'a breve', 'a café con leche', 'a su café', 'a café del tiempo',
|
||||
'a java chip frappuccino', 'a pumpkin spice latte', 'a caramel macchiato',
|
||||
'a white chocolate mocha', 'a hazelnut coffee', 'a toffee nut latte',
|
||||
'a peppermint mocha', 'a cinnamon dolce latte', 'a coconut milk latte',
|
||||
'an almond milk cappuccino', 'an oat milk latte', 'a caramel frappuccino',
|
||||
'a chocolate frappuccino', 'a butter pecan coffee', 'a maple pecan latte',
|
||||
'a sea salt caramel mocha', 'a nitro cold brew', 'a pumpkin cold brew',
|
||||
'a honey almond flat white', 'a sweet cream cold brew', 'a matcha latte',
|
||||
'a golden latte', 'a turmeric latte', 'a beetroot latte', 'a kopi luwak']
|
||||
self.LAST_5_COFFEES: list = []
|
||||
|
||||
|
||||
def tdTuple(self, td:datetime.timedelta) -> tuple:
|
||||
"""
|
||||
Create TimeDelta Tuple
|
||||
|
||||
Args:
|
||||
td (datetime.timedelta)
|
||||
Returns:
|
||||
tuple
|
||||
|
||||
"""
|
||||
def _t(t, n):
|
||||
if t < n:
|
||||
return (t, 0)
|
||||
v = t//n
|
||||
return (t - (v * n), v)
|
||||
(s, h) = _t(td.seconds, 3600)
|
||||
(s, m) = _t(s, 60)
|
||||
(mics, mils) = _t(td.microseconds, 1000)
|
||||
return (td.days, h, m, s, mics, mils)
|
||||
|
||||
async def get_counter(self, counter: Optional[str] = None) -> Optional[dict]:
|
||||
"""
|
||||
Get Counter
|
||||
|
||||
Args:
|
||||
counter (Optional[str])
|
||||
Returns:
|
||||
Optional[dict]
|
||||
|
||||
"""
|
||||
stats_db: str|LiteralString = self.dbs.get('stats', '')
|
||||
if not stats_db:
|
||||
return None
|
||||
async with sqlite3.connect(stats_db,
|
||||
timeout=3) as db_conn:
|
||||
db_conn.row_factory = sqlite3.Row
|
||||
query: str = "SELECT ? FROM stats LIMIT 1"
|
||||
if not counter:
|
||||
query = "SELECT * FROM stats LIMIT 1"
|
||||
async with await db_conn.execute(query, (counter,) if counter else None) as db_cursor:
|
||||
result = await db_cursor.fetchone()
|
||||
return result
|
||||
|
||||
async def get_stats_embed(self) -> Optional[Embed]:
|
||||
"""
|
||||
Get Stats Embed
|
||||
|
||||
Returns:
|
||||
Optional[Embed]
|
||||
|
||||
"""
|
||||
counters: Optional[dict] = await self.get_counter()
|
||||
if not counters:
|
||||
return None
|
||||
embed: Embed = Embed(title="Stats")
|
||||
counter_message: str = ""
|
||||
counters_sorted: dict = dict(sorted(counters.items(),
|
||||
key=lambda item: item[1], reverse=True))
|
||||
for counter, value in counters_sorted.items():
|
||||
counter = regex.sub(r'_', ' ',
|
||||
counter.strip()).title()
|
||||
counter_message += f"- {value} {counter}\n"
|
||||
embed.description = counter_message.strip()
|
||||
return embed
|
||||
|
||||
async def increment_counter(self, counter: str) -> bool:
|
||||
"""
|
||||
Increment Counter
|
||||
|
||||
Args:
|
||||
counter (str)
|
||||
Returns:
|
||||
bool
|
||||
|
||||
"""
|
||||
stats_db: str|LiteralString = self.dbs.get('stats', '')
|
||||
if not stats_db:
|
||||
return False
|
||||
async with sqlite3.connect(stats_db,
|
||||
timeout=3) as db_conn:
|
||||
async with await db_conn.execute(f"UPDATE stats SET {counter} = {counter} + 1") as db_cursor:
|
||||
if db_cursor.rowcount < 0:
|
||||
logging.critical("[karma::increment_counter] Fail! %s", db_cursor.rowcount)
|
||||
return False
|
||||
await db_conn.commit()
|
||||
return True
|
||||
|
||||
async def get_ud_def(self, term: str) -> tuple[str, str]:
|
||||
"""
|
||||
Get Definition from UD
|
||||
|
||||
Args:
|
||||
term (str)
|
||||
Returns:
|
||||
tuple[str, str]
|
||||
|
||||
"""
|
||||
try:
|
||||
async with ClientSession() as session:
|
||||
async with await session.get(self.URL_URBANDICTIONARY,
|
||||
params={
|
||||
"term": term,
|
||||
},
|
||||
headers = {
|
||||
'content-type': 'application/json; charset=utf-8',
|
||||
}, timeout=ClientTimeout(connect=5, sock_read=5)) as request:
|
||||
logging.debug("UD returned: %s",
|
||||
await request.text())
|
||||
data: dict = await request.json()
|
||||
if "list" in data:
|
||||
definitions: list[dict] = data["list"]
|
||||
if definitions:
|
||||
definition: dict = definitions[0]
|
||||
definition_word: str = definition.get("word", "N/A")
|
||||
definition_text: str = regex.sub(r'(\r|\n|\r\n)', ' ', definition["definition"].strip())
|
||||
return (definition_word, definition_text) # Tuple: Returned word, returned definition
|
||||
else:
|
||||
return (term, "Not found!")
|
||||
else:
|
||||
return (term, "Error retrieving data from Urban Dictionary")
|
||||
except Exception as e:
|
||||
traceback.print_exc()
|
||||
return (term, f"ERR: {str(e)}")
|
||||
|
||||
async def get_insult(self, recipient: str) -> str:
|
||||
"""
|
||||
Get Insult
|
||||
|
||||
Args:
|
||||
recipient (str)
|
||||
Returns:
|
||||
str
|
||||
|
||||
"""
|
||||
async with ClientSession() as session:
|
||||
async with await session.get(f"{self.URL_INSULTAPI}?who={recipient}") as request:
|
||||
request.raise_for_status()
|
||||
return await request.text()
|
||||
|
||||
|
||||
async def get_compliment(self, subject: str,
|
||||
language: Optional[str] = 'en') -> str:
|
||||
"""
|
||||
Get Compliment
|
||||
|
||||
Args:
|
||||
subject (str)
|
||||
language (Optional[str]) (default: 'en')
|
||||
Returns:
|
||||
str
|
||||
|
||||
"""
|
||||
return self.COMPLIMENT_GENERATOR.compliment_in_language(subject, language)
|
||||
|
||||
async def get_whisky(self) -> Optional[tuple]:
|
||||
"""
|
||||
Get Whisky
|
||||
|
||||
Returns:
|
||||
Optional[tuple]
|
||||
|
||||
"""
|
||||
whisky_db: str|LiteralString = self.dbs.get('whisky', '')
|
||||
if not whisky_db:
|
||||
return None
|
||||
async with sqlite3.connect(database=whisky_db,
|
||||
timeout=2) as db_conn:
|
||||
db_query: str = "SELECT name, category, description FROM whiskeys ORDER BY random() LIMIT 1"
|
||||
async with await db_conn.execute(db_query) as db_cursor:
|
||||
db_result: Optional[Union[sqlite3.Row, tuple]] = await db_cursor.fetchone()
|
||||
if not db_result:
|
||||
return None
|
||||
(name, category, description) = db_result
|
||||
name = regex.sub(r'(^\p{White_Space}|\r|\n)', '',
|
||||
regex.sub(r'\p{White_Space}{2,}', ' ',
|
||||
name.strip()))
|
||||
category = regex.sub(r'(^\p{White_Space}|\r|\n)', '',
|
||||
regex.sub(r'\p{White_Space}{2,}', ' ',
|
||||
category.strip()))
|
||||
description = regex.sub(r'(^\p{White_Space}|\r|\n)', '',
|
||||
regex.sub(r'\p{White_Space}{2,}', ' ',
|
||||
description.strip()))
|
||||
return (name, category, description)
|
||||
|
||||
async def get_drink(self) -> Optional[tuple]:
|
||||
"""
|
||||
Get Drink
|
||||
|
||||
Returns:
|
||||
Optional[tuple]
|
||||
|
||||
"""
|
||||
drinks_db: str|LiteralString = self.dbs.get('drinks', '')
|
||||
if not drinks_db:
|
||||
return None
|
||||
async with sqlite3.connect(database=drinks_db,
|
||||
timeout=2) as db_conn:
|
||||
db_query: str = "SELECT name, ingredients FROM cocktails ORDER BY random() LIMIT 1"
|
||||
async with await db_conn.execute(db_query) as db_cursor:
|
||||
db_result: tuple = await db_cursor.fetchone()
|
||||
|
||||
(name, ingredients) = db_result
|
||||
name = regex.sub(r'(^\p{White_Space}|\r|\n)', '', regex.sub(r'\p{White_Space}{2,}', ' ', name.strip()))
|
||||
ingredients = regex.sub(r'(^\p{White_Space}|\r|\n)', '', regex.sub(r'\p{White_Space}{2,}', ' ', ingredients.strip()))
|
||||
ingredients = regex.sub(r'\*', '\u2731', ingredients.strip())
|
||||
return (name, ingredients)
|
||||
|
||||
async def get_strain(self, strain: Optional[str] = None) -> Optional[tuple]:
|
||||
"""
|
||||
Get Strain
|
||||
|
||||
Args:
|
||||
strain (Optional[str])
|
||||
Returns:
|
||||
Optional[tuple]
|
||||
|
||||
"""
|
||||
strains_db: str|LiteralString = self.dbs.get('strains', '')
|
||||
if not strains_db:
|
||||
return None
|
||||
async with sqlite3.connect(database=strains_db,
|
||||
timeout=2) as db_conn:
|
||||
db_params: Optional[tuple] = None
|
||||
if not strain:
|
||||
db_query: str = "SELECT name, description FROM strains_w_desc ORDER BY random() LIMIT 1"
|
||||
else:
|
||||
db_query = "SELECT name, description FROM strains_w_desc WHERE name LIKE ?"
|
||||
db_params = (f"%{strain.strip()}%",)
|
||||
async with await db_conn.execute(db_query, db_params) as db_cursor:
|
||||
db_result: Optional[tuple] = await db_cursor.fetchone()
|
||||
return db_result
|
||||
|
||||
async def get_qajoke(self) -> Optional[tuple]:
|
||||
"""
|
||||
Get QA Joke
|
||||
|
||||
Returns:
|
||||
Optional[tuple]
|
||||
|
||||
"""
|
||||
qajoke_db: str|LiteralString = self.dbs.get('qajoke', '')
|
||||
if not qajoke_db:
|
||||
return None
|
||||
async with sqlite3.connect(database=qajoke_db,
|
||||
timeout=2) as db_conn:
|
||||
db_query: str = "SELECT question, answer FROM jokes ORDER BY RANDOM() LIMIT 1"
|
||||
async with await db_conn.execute(db_query) as cursor:
|
||||
(question, answer) = await cursor.fetchone()
|
||||
return (question, answer)
|
||||
return None
|
||||
|
||||
async def get_rjoke(self) -> Optional[tuple]:
|
||||
"""
|
||||
Get r/joke Joke
|
||||
|
||||
Returns:
|
||||
Optional[tuple]
|
||||
|
||||
"""
|
||||
rjokes_db: str|LiteralString = self.dbs.get('rjokes', '')
|
||||
if not rjokes_db:
|
||||
return None
|
||||
async with sqlite3.connect(database=rjokes_db, timeout=2) as db_conn:
|
||||
db_query: str = "SELECT title, body, score FROM jokes WHERE score >= 100 ORDER BY RANDOM() LIMIT 1'"
|
||||
async with await db_conn.execute(db_query) as cursor:
|
||||
(title, body, score) = await cursor.fetchone()
|
||||
return (title, body, score)
|
||||
return None
|
||||
|
||||
|
||||
async def get_random_fact(self) -> str:
|
||||
"""
|
||||
Get Random Fact
|
||||
|
||||
Returns:
|
||||
str
|
||||
|
||||
"""
|
||||
try:
|
||||
facts_api_url: str = "https://uselessfacts.jsph.pl/api/v2/facts/random"
|
||||
facts_backup_url: str = "https://cnichols1734.pythonanywhere.com/facts/random"
|
||||
async with ClientSession() as client:
|
||||
try:
|
||||
async with await client.get(facts_api_url,
|
||||
timeout=ClientTimeout(connect=5, sock_read=5)) as request:
|
||||
_json: dict = await request.json()
|
||||
fact: str = _json.get('text', None)
|
||||
if not fact:
|
||||
raise BaseException("RandFact Src 1 Failed")
|
||||
return fact
|
||||
except:
|
||||
async with await client.get(facts_backup_url,
|
||||
timeout=ClientTimeout(connect=5, sock_read=5)) as request:
|
||||
_json = await request.json()
|
||||
fact = _json.get('fact', None)
|
||||
return fact
|
||||
except Exception as e:
|
||||
traceback.print_exc()
|
||||
return f"Failed to get a random fact :( [{str(e)}]"
|
||||
|
||||
async def get_cookie(self) -> Optional[dict]:
|
||||
"""
|
||||
Get Cookie
|
||||
|
||||
Returns:
|
||||
Optional[dict]
|
||||
|
||||
"""
|
||||
cookies_db = self.dbs.get('cookies', '')
|
||||
if not cookies_db:
|
||||
return None
|
||||
async with sqlite3.connect(cookies_db,
|
||||
timeout=2) as db_conn:
|
||||
db_query: str = "SELECT name, origin, image_url FROM cookies ORDER BY RANDOM() LIMIT 1"
|
||||
async with await db_conn.execute(db_query) as db_cursor:
|
||||
(name, origin, image_url) = await db_cursor.fetchone()
|
||||
return {
|
||||
'name': name,
|
||||
'origin': origin,
|
||||
'image_url': image_url
|
||||
}
|
||||
|
||||
|
||||
def get_coffee(self,
|
||||
recipient_allergic: Optional[bool] = False) -> Optional[str]:
|
||||
"""
|
||||
Get Coffee
|
||||
|
||||
Args:
|
||||
recipient_allergic (bool): Is the recipient allergic? (so we know when to keep our nuts out of it)
|
||||
|
||||
Returns:
|
||||
str
|
||||
|
||||
"""
|
||||
try:
|
||||
randomCoffee: str = random.choice(self.COFFEES)
|
||||
if self.LAST_5_COFFEES and randomCoffee in self.LAST_5_COFFEES\
|
||||
or (recipient_allergic and "nut" in randomCoffee.lower()):
|
||||
return self.get_coffee() # Recurse
|
||||
if len(self.LAST_5_COFFEES) >= 5:
|
||||
self.LAST_5_COFFEES.pop() # Store no more than 5 of the last served coffees
|
||||
self.LAST_5_COFFEES.append(randomCoffee)
|
||||
return randomCoffee
|
||||
except:
|
||||
traceback.print_exc()
|
||||
return None
|
||||
|
||||
def get_days_to_xmas(self) -> Optional[tuple]:
|
||||
"""
|
||||
Get # of Days until Xmas
|
||||
|
||||
Returns:
|
||||
Optional[tuple]
|
||||
|
||||
"""
|
||||
today: datetime.datetime = datetime.datetime.now(tz=pytz.UTC)
|
||||
xmas: datetime.datetime = datetime.datetime(
|
||||
year=today.year,
|
||||
month=12,
|
||||
day=25,
|
||||
tzinfo=pytz.UTC,
|
||||
)
|
||||
td: datetime.timedelta = (xmas - today)
|
||||
days, hours, minutes, seconds, us, ms = self.tdTuple(td)
|
||||
|
||||
return (days, hours, minutes, seconds, ms, us)
|
||||
|
||||
async def get_randmsg(self) -> Optional[str]:
|
||||
"""
|
||||
Get Random Message from randmsg.db
|
||||
|
||||
Returns:
|
||||
Optional[str]
|
||||
|
||||
"""
|
||||
randmsg_db: str|LiteralString = self.dbs.get('randmsg', '')
|
||||
if not randmsg_db:
|
||||
return None
|
||||
async with sqlite3.connect(database=randmsg_db,
|
||||
timeout=2) as db_conn:
|
||||
db_query: str = "SELECT msg FROM msgs ORDER BY RANDOM() LIMIT 1"
|
||||
async with await db_conn.execute(db_query) as db_cursor:
|
||||
(result,) = await db_cursor.fetchone()
|
||||
return result
|
Reference in New Issue
Block a user