discord-havoc/cogs/misc_util.py

388 lines
16 KiB
Python
Raw Normal View History

2025-02-13 14:51:35 -05:00
#!/usr/bin/env python3.12
import os
import logging
import traceback
import random
import datetime
import pytz
from typing import Optional, LiteralString
import regex
import aiosqlite as sqlite3
from aiohttp import ClientSession, ClientTimeout
from bohancompliment import ComplimentGenerator
from discord import Embed
class Util:
"""Misc Utility"""
def __init__(self) -> None:
self.URL_URBANDICTIONARY: str = "http://api.urbandictionary.com/v0/define"
self.URL_INSULTAPI: str = "https://insult.mattbas.org/api/insult"
self.COMPLIMENT_GENERATOR = ComplimentGenerator()
self.dbs: dict[str|LiteralString] = {
'whisky': os.path.join("/usr/local/share",
"sqlite_dbs", "whiskey.db"),
'drinks': os.path.join("/usr/local/share",
"sqlite_dbs", "cocktails.db"),
'strains': os.path.join("/usr/local/share",
"sqlite_dbs", "strains.db"),
'qajoke': os.path.join("/usr/local/share",
"sqlite_dbs", "qajoke.db"),
'rjokes': os.path.join("/usr/local/share",
"sqlite_dbs", "rjokes.db"),
'randmsg': os.path.join("/usr/local/share",
"sqlite_dbs", "randmsg.db"),
'stats': os.path.join("/usr/local/share",
"sqlite_dbs", "havoc_stats.db"),
'cookies': os.path.join("/usr/local/share",
"sqlite_dbs", "cookies.db"),
}
self.COFFEES: list = ['a cup of french-pressed coffee', 'a cup of cold brew', 'a cup of flash brew',
'a cup of Turkish coffee', 'a cup of Moka', 'an espresso',
'a cup of Nescafe coffee',
'an iced coffee', 'a Frappé', 'a freddo cappuccino',
'a cup of Chock full o\'Nuts', 'a cup of Folgers', 'a cup of Lavazza',
'a cup of Maxwell House', 'a cup of Moccona', 'a cup of Mr. Brown Coffee',
'a cup of affogato al caffè',
'a cup of Caffè Medici', 'a cup of Café Touba',
'a double-double', 'an indian filter coffee', 'a cup of pocillo',
'a cup of caffè americano', 'a cup of caffè lungo', 'a latte', 'a manilo',
'a flat white', 'a cup of café cubano', 'a cup of caffè crema',
'a cup of cafe zorro', 'an espresso roberto', 'an espresso romano',
'an espresso sara', 'a guillermo', 'a ristretto', 'a cup of melya',
'a cup of caffè marocchino', 'a cup of café miel', 'a cup of café de olla',
'a Mazagran', 'a Palazzo', 'an ice shot']
self.LAST_5_COFFEES: list = []
def tdTuple(self, td:datetime.timedelta) -> tuple:
"""
Create TimeDelta Tuple
Args:
td (datetime.timedelta)
Returns:
tuple
"""
def _t(t, n):
if t < n:
return (t, 0)
v = t//n
return (t - (v * n), v)
(s, h) = _t(td.seconds, 3600)
(s, m) = _t(s, 60)
(mics, mils) = _t(td.microseconds, 1000)
return (td.days, h, m, s, mics, mils)
def sqlite_dict_factory(self, cursor: sqlite3.Cursor, row: sqlite3.Row) -> dict:
"""
SQLite Dict Factory for Rows Returned
Args:
cursor (sqlite3.Row)
row (sqlite3.Row)
Returns:
dict
"""
fields = [column[0] for column in cursor.description]
return { key: value for key, value in zip(fields, row) }
async def get_counter(self, counter: Optional[str] = None) -> dict:
"""
Get Counter
Args:
counter (Optional[str])
Returns:
dict
"""
async with sqlite3.connect(self.dbs.get('stats'),
timeout=3) as db_conn:
db_conn.row_factory = self.sqlite_dict_factory
query: str = "SELECT ? FROM stats LIMIT 1"
if not counter:
query: str = "SELECT * FROM stats LIMIT 1"
async with await db_conn.execute(query, (counter,) if counter else None) as db_cursor:
result: dict = await db_cursor.fetchone()
return result
async def get_stats_embed(self) -> Embed:
"""
Get Stats Embed
Returns:
Embed
"""
counters: dict = await self.get_counter()
embed: Embed = Embed(title="Stats")
counter_message: str = ""
counters_sorted: dict = dict(sorted(counters.items(),
key=lambda item: item[1], reverse=True))
for counter, value in counters_sorted.items():
counter: str = regex.sub(r'_', ' ',
counter.strip()).title()
counter_message += f"- {value} {counter}\n"
embed.description = counter_message.strip()
return embed
async def increment_counter(self, counter: str) -> bool:
"""
Increment Counter
Args:
counter (str)
Returns:
bool
"""
async with sqlite3.connect(self.dbs.get('stats'),
timeout=3) as db_conn:
async with await db_conn.execute(f"UPDATE stats SET {counter} = {counter} + 1") as db_cursor:
if db_cursor.rowcount < 0:
logging.critical("[karma::increment_counter] Fail! %s", db_cursor.rowcount)
return False
await db_conn.commit()
return True
async def get_ud_def(self, term: Optional[str] = None) -> tuple[str, str]:
"""
Get Definition from UD
Args:
term (Optional[str])
Returns:
tuple[str, str]
"""
try:
async with ClientSession() as session:
async with await session.get(self.URL_URBANDICTIONARY,
params={
"term": term,
},
headers = {
'content-type': 'application/json; charset=utf-8',
}, timeout=ClientTimeout(connect=5, sock_read=5)) as request:
logging.debug("UD returned: %s",
await request.text())
data: dict = await request.json()
if "list" in data:
definitions: list[dict] = data["list"]
if definitions:
definition: dict = definitions[0]
definition_word: str = definition.get("word", "N/A")
definition_text: str = regex.sub(r'(\r|\n|\r\n)', ' ', definition["definition"].strip())
return (definition_word, definition_text) # Tuple: Returned word, returned definition
else:
return (term, "Not found!")
else:
return (term, "Error retrieving data from Urban Dictionary")
except Exception as e:
traceback.print_exc()
return (term, f"ERR: {str(e)}")
async def get_insult(self, recipient: str) -> str:
"""
Get Insult
Args:
recipient (str)
Returns:
str
"""
async with ClientSession() as session:
async with await session.get(f"{self.URL_INSULTAPI}?who={recipient}") as request:
request.raise_for_status()
return await request.text()
async def get_compliment(self, subject: str,
language: Optional[str] = None) -> str:
"""
Get Compliment
Args:
subject (str)
language (Optional[str])
Returns:
str
"""
if not language:
return self.COMPLIMENT_GENERATOR.compliment(subject)
return self.COMPLIMENT_GENERATOR.compliment_in_language(subject, language)
async def get_whisky(self) -> tuple:
"""
Get Whisky
Returns:
tuple
"""
whisky_db: str|LiteralString = self.dbs.get('whisky')
db_conn = await sqlite3.connect(database=whisky_db, timeout=2)
db_query: str = "SELECT name, category, description FROM whiskeys ORDER BY random() LIMIT 1"
db_cursor: sqlite3.Cursor = await db_conn.execute(db_query)
db_result: tuple = await db_cursor.fetchone()
(name, category, description) = db_result
name: str = regex.sub(r'(^\p{White_Space}|\r|\n)', '',
regex.sub(r'\p{White_Space}{2,}', ' ',
name.strip()))
category: str = regex.sub(r'(^\p{White_Space}|\r|\n)', '',
regex.sub(r'\p{White_Space}{2,}', ' ',
category.strip()))
description: str = regex.sub(r'(^\p{White_Space}|\r|\n)', '',
regex.sub(r'\p{White_Space}{2,}', ' ',
description.strip()))
return (name, category, description)
async def get_drink(self) -> tuple:
"""
Get Drink
Returns:
tuple
"""
drinks_db: str|LiteralString = self.dbs.get('drinks')
db_conn = await sqlite3.connect(database=drinks_db, timeout=2)
db_query: str = "SELECT name, ingredients FROM cocktails ORDER BY random() LIMIT 1"
db_cursor: sqlite3.Cursor = await db_conn.execute(db_query)
db_result: tuple = await db_cursor.fetchone()
(name, ingredients) = db_result
name = regex.sub(r'(^\p{White_Space}|\r|\n)', '', regex.sub(r'\p{White_Space}{2,}', ' ', name.strip()))
ingredients = regex.sub(r'(^\p{White_Space}|\r|\n)', '', regex.sub(r'\p{White_Space}{2,}', ' ', ingredients.strip()))
ingredients = regex.sub(r'\*', '\u2731', ingredients.strip())
return (name, ingredients)
async def get_strain(self, strain: Optional[str] = None) -> tuple:
"""
Get Strain
Args:
strain (Optional[str])
Returns:
tuple
"""
strains_db: str|LiteralString = self.dbs.get('strains')
db_conn = await sqlite3.connect(database=strains_db, timeout=2)
db_params: Optional[tuple] = None
if not strain:
db_query: str = "SELECT name, description FROM strains_w_desc ORDER BY random() LIMIT 1"
else:
db_query: str = "SELECT name, description FROM strains_w_desc WHERE name LIKE ?"
db_params: tuple = (f"%{strain.strip()}%",)
db_cursor: sqlite3.Cursor = await db_conn.execute(db_query, db_params)
db_result: tuple = await db_cursor.fetchone()
return db_result
async def get_qajoke(self) -> tuple:
"""
Get QA Joke
Returns:
tuple
"""
qajoke_db: str|LiteralString = self.dbs.get('qajoke')
async with sqlite3.connect(database=qajoke_db, timeout=2) as db:
async with await db.execute('SELECT question, answer FROM jokes ORDER BY RANDOM() LIMIT 1') as cursor:
(question, answer) = await cursor.fetchone()
return (question, answer)
return None
async def get_rjoke(self) -> tuple:
"""
Get r/joke Joke
Returns:
tuple
"""
rjokes_db: str|LiteralString = self.dbs.get('rjokes')
async with sqlite3.connect(database=rjokes_db, timeout=2) as db:
async with await db.execute('SELECT title, body, score FROM jokes WHERE score >= 100 ORDER BY RANDOM() LIMIT 1') as cursor:
(title, body, score) = await cursor.fetchone()
return (title, body, score)
return None
async def get_random_fact(self) -> str:
"""
Get Random Fact
Returns:
str
"""
try:
facts_api_url: str = "https://uselessfacts.jsph.pl/api/v2/facts/random"
facts_backup_url: str = "https://cnichols1734.pythonanywhere.com/facts/random"
async with ClientSession() as client:
try:
async with await client.get(facts_api_url,
timeout=ClientTimeout(connect=5, sock_read=5)) as request:
json: dict = await request.json()
fact: str = json.get('text')
if not fact:
raise BaseException("RandFact Src 1 Failed")
return fact
except:
async with await client.get(facts_backup_url,
timeout=ClientTimeout(connect=5, sock_read=5)) as request:
json: dict = await request.json()
fact: str = json.get('fact')
return fact
except Exception as e:
traceback.print_exc()
return f"Failed to get a random fact :( [{str(e)}]"
async def get_cookie(self) -> dict:
"""
Get Cookie
Returns:
dict
"""
async with sqlite3.connect(self.dbs.get('cookies'), timeout=2) as db_conn:
async with await db_conn.execute("SELECT name, origin, image_url FROM cookies ORDER BY RANDOM() LIMIT 1") as db_cursor:
(name, origin, image_url) = await db_cursor.fetchone()
return {
'name': name,
'origin': origin,
'image_url': image_url
}
def get_coffee(self) -> str:
"""
Get Coffee
Returns:
str
"""
try:
randomCoffee: str = random.choice(self.COFFEES)
if self.LAST_5_COFFEES and randomCoffee in self.LAST_5_COFFEES:
return self.get_coffee() # Recurse
if len(self.LAST_5_COFFEES) >= 5:
self.LAST_5_COFFEES.pop() # Store no more than 5 of the last served coffees
self.LAST_5_COFFEES.append(randomCoffee)
return randomCoffee
except:
traceback.print_exc()
return False
def get_days_to_xmas(self) -> tuple[int|float]:
"""
Get # of Days until Xmas
Returns:
tuple[int|float]
"""
today: datetime = datetime.datetime.now(tz=pytz.UTC)
xmas: datetime = datetime.datetime(
year=today.year,
month=12,
day=25,
tzinfo=pytz.UTC,
)
td: datetime.timedelta = (xmas - today) # pylint: disable=superfluous-parens
days, hours, minutes, seconds, us, ms = self.tdTuple(td)
return (days, hours, minutes, seconds, ms, us)
async def get_randmsg(self) -> str:
"""
Get Random Message from randmsg.db
Returns:
str
"""
randmsg_db = self.dbs.get('randmsg')
async with sqlite3.connect(database=randmsg_db,
timeout=2) as db_conn:
async with await db_conn.execute("SELECT msg FROM msgs ORDER BY RANDOM() LIMIT 1") as db_cursor:
(result,) = await db_cursor.fetchone()
return result