initial push

This commit is contained in:
2025-02-13 14:51:35 -05:00
commit 59caad4c74
19 changed files with 4104 additions and 0 deletions

6
util/__init__.py Normal file
View File

@ -0,0 +1,6 @@
#!/usr/bin/env python3.12
import importlib
from . import discord_helpers
importlib.reload(discord_helpers)

52
util/discord_helpers.py Normal file
View File

@ -0,0 +1,52 @@
#!/usr/bin/env python3.12
import re
import discord
from typing import Optional, Any
"""
Discord Helper Methods
"""
async def get_channel_by_name(bot: discord.Bot, channel: str,
guild: int | None = None) -> Optional[Any]: # Optional[Any] used as pycord channel types can be ambigious
"""
Get Channel by Name
Args:
bot (discord.Bot)
channel (str)
guild (int|None)
Returns:
Optional[Any]
"""
channel: str = re.sub(r'^#', '', channel.strip())
if not guild:
return discord.utils.get(bot.get_all_channels(),
name=channel)
else:
channels: list = bot.get_guild(guild).channels
for _channel in channels:
if _channel.name.lower() == channel.lower().strip():
return _channel
return
async def send_message(bot: discord.Bot, channel: str,
message: str, guild: int | None = None) -> None:
"""
Send Message to the provided channel. If guild is provided, will limit to channels within that guild to ensure the correct
channel is selected. Useful in the event a channel exists in more than one guild that the bot resides in.
Args:
bot (discord.Bot)
channel (str)
message (str)
guild (int|None)
Returns:
None
"""
if channel.isnumeric():
channel: int = int(channel)
_channel = bot.get_channel(channel)
else:
channel: str = re.sub(r'^#', '', channel.strip())
_channel = await get_channel_by_name(bot=bot,
channel=channel, guild=guild)
await _channel.send(message)

162
util/lovehate_db.py Normal file
View File

@ -0,0 +1,162 @@
#!/usr/bin/env python3.12
import os
import logging
from typing import Optional, LiteralString
import aiosqlite as sqlite3
from constructors import LoveHateException
class DB:
"""LoveHate DB Utility Class"""
def __init__(self, bot) -> None:
self.db_path: str|LiteralString = os.path.join("/usr/local/share",
"sqlite_dbs", "lovehate.db")
async def get_wholovehates(self, thing: str, loves: bool = False,
hates: bool = False) -> list[tuple]|bool:
"""
Get a list of users who have professed their love OR hatred for <thing>
Args:
thing (str): The <thing> to check
loves (bool): Are we looking for loves?
hates (bool): ...or are we looking for hates?
Returns:
list[tuple]
"""
query: str = "SELECT display_name FROM lovehate WHERE thing LIKE ? AND flag = ?"
params: tuple = tuple()
flag: Optional[int] = None
if hates and loves:
raise LoveHateException("Both hates and loves may not be True")
elif hates:
flag: int = -1
elif loves:
flag: int = 1
elif not hates and not loves:
raise LoveHateException("Neither loves nor hates were requested")
params: tuple = (thing, flag,)
async with sqlite3.connect(self.db_path, timeout=2) as db_conn:
async with await db_conn.execute(query, params) as db_cursor:
result: list[tuple] = await db_cursor.fetchall()
logging.debug("Result: %s", result)
if not result:
return False
return result
async def get_lovehates(self, loves: bool = False, hates: bool = False,
user: str = None, thing: str = None) -> list[tuple]|bool:
"""
Get a list of either 1) what {user} loves/hates, or who loves/hates {thing}, depending on bools loves, hates
Args:
loves (bool): Are we looking for loves?
hates (bool): ...OR are we looking for hates?
user (Optional[str]): the user to query against
thing (Optional[str]): ... OR the thing to query against
Returns:
list[tuple]|bool
"""
query: str = ""
params: tuple = tuple()
if not user and not thing:
raise LoveHateException("Neither a <user> or <thing> was specified to query against")
flag: Optional[int] = None
if hates and loves:
raise LoveHateException("Both hates and loves may not be True")
elif hates:
flag: int = -1
elif loves:
flag: int = 1
elif not hates and not loves:
raise LoveHateException("Neither loves nor hates were requested")
if user:
query: str = "SELECT thing FROM lovehate WHERE display_name LIKE ? AND flag == ?"
params: tuple = (user, flag,)
elif thing:
query: str = "SELECT display_name FROM lovehate WHERE thing LIKE ? AND flag == ?"
params: tuple = (thing, flag,)
async with sqlite3.connect(self.db_path, timeout=2) as db_conn:
async with await db_conn.execute(query, params) as db_cursor:
result = await db_cursor.fetchall()
if not result:
return False
return result
async def check_existence(self, user: str, thing: str) -> Optional[int]:
"""
Determine whether a user is opinionated on a <thing>
Args:
user (str): The user to check
thing (str): The thing to check if the user has an opinion on
Returns:
Optional[int]
"""
params = (user, thing,)
async with sqlite3.connect(self.db_path, timeout=2) as db_conn:
async with await db_conn.execute("SELECT id, flag FROM lovehate WHERE display_name LIKE ? AND thing LIKE ?", params) as db_cursor:
result = await db_cursor.fetchone()
if not result:
return None
(_, flag) = result
return flag
async def update(self, user: str, thing: str, flag: int) -> str:
"""
Updates the lovehate database, and returns an appropriate response
Args:
user (str): The user to update
thing (str): The thing the user loves/hates/doesn't care about anymore
flag (int): int representation of love (1), hate (-1), and dontcare (0)
Returns:
str
"""
if not flag in range(-1, 2):
raise LoveHateException(f"Invalid flag {flag} specified, is this love (1), hate (-1), or dontcare? (0)")
db_query: str = ""
params: tuple = (user, thing,)
already_opinionated: bool = await self.check_existence(user, thing)
if already_opinionated:
if flag == 0:
db_query: str = "DELETE FROM lovehate WHERE display_name LIKE ? AND thing LIKE ?"
else:
loves_or_hates: str = "loves"
if already_opinionated == -1:
loves_or_hates: str = "hates"
raise LoveHateException(f"But {user} already {loves_or_hates} {thing}...")
else:
match flag:
case -1:
db_query: str = "INSERT INTO lovehate(display_name, flag, thing) VALUES(?, -1, ?)"
case 1:
db_query: str = "INSERT INTO lovehate(display_name, flag, thing) VALUES(?, 1, ?)"
case _:
raise LoveHateException("Unknown error, default case matched")
async with sqlite3.connect(self.db_path, timeout=2) as db_conn:
async with await db_conn.execute(db_query, params) as db_cursor:
await db_conn.commit()
if db_cursor.rowcount != 1:
raise LoveHateException(f"DB Error - RowCount: {db_cursor.rowcount} for INSERT query")
match flag:
case -1:
return f"We're done here, {user} hates {thing}."
case 0:
return f"We're done here, {user} no longer cares one way or the other about {thing}."
case 1:
return f"We're done here, {user} loves {thing}."
case _:
raise LoveHateException("Unknown error, default case matched [2]")

139
util/sing_util.py Normal file
View File

@ -0,0 +1,139 @@
#!/usr/bin/env python3.12
import logging
import regex
import aiohttp
import textwrap
import traceback
from typing import Optional
from discord import Activity
class Utility:
"""Sing Utility"""
def __init__(self):
self.api_url: str = "http://127.0.0.1:52111/lyric/search"
self.api_src: str = "DISC-HAVOC"
def parse_song_input(self, song: Optional[str] = None,
activity: Optional[Activity] = None) -> bool|tuple:
"""Parse Song (Sing Command) Input
Args:
song (Optional[str]): Song to search
activity (Optional[discord.Activity]): Discord activity, used to attempt lookup if no song is provided
Returns:
bool|tuple
"""
logging.debug("Activity? %s", activity)
try:
if (not song or len(song) < 2) and not activity:
# pylint: disable=superfluous-parens
return False
# pylint: enable=superfluous-parens
if not song and activity:
match activity.name.lower():
case "codey toons" | "cider" | "sonixd":
search_artist: str = " ".join(str(activity.state)\
.strip().split(" ")[1:])
search_artist: str = regex.sub(r"(\s{0,})(\[(spotify|tidal|sonixd|browser|yt music)])$", "",
search_artist.strip(), flags=regex.IGNORECASE)
search_song: str = str(activity.details)
song: str = f"{search_artist} : {search_song}"
case "tidal hi-fi":
search_artist: str = str(activity.state)
search_song: str = str(activity.details)
song: str = f"{search_artist} : {search_song}"
case "spotify":
search_artist: str = str(activity.title)
search_song: str = str(activity.artist)
song: str = f"{search_artist} : {search_song}"
case "serious.fm" | "cocks.fm" | "something":
if not activity.details:
song: str = str(activity.state)
else:
search_artist: str = str(activity.state)
search_song: str = str(activity.details)
song: str = f"{search_artist} : {search_song}"
case _:
return False # Unsupported activity detected
search_split_by: str = ":" if not(song) or len(song.split(":")) > 1\
else "-" # Support either : or - to separate artist/track
search_artist: str = song.split(search_split_by)[0].strip()
search_song: str = "".join(song.split(search_split_by)[1:]).strip()
search_subsearch: Optional[str] = None
if search_split_by == ":" and len(song.split(":")) > 2: # Support sub-search if : is used (per instructions)
search_song: str = song.split(search_split_by)[1].strip() # Reduce search_song to only the 2nd split of : [the rest is meant to be lyric text]
search_subsearch: str = "".join(song.split(search_split_by)[2:]) # Lyric text from split index 2 and beyond
return (search_artist, search_song, search_subsearch)
except:
traceback.print_exc()
return False
async def lyric_search(self, artist: str, song: str,
sub: Optional[str] = None) -> list[str]:
"""
Lyric Search
Args:
artist (str): Artist to search
song (str): Song to search
sub (Optional[str]): Lyrics for subsearch
"""
try:
if not artist or not song:
return ["FAIL! Artist/Song not provided"]
search_obj: dict = {
'a': artist.strip(),
's': song.strip(),
'extra': True,
'src': self.api_src,
}
if len(song.strip()) < 1:
search_obj.pop('a')
search_obj.pop('s')
search_obj['t'] = artist.strip() # Parse failed, try title without sep
if sub and len(sub) >= 2:
search_obj['sub'] = sub.strip()
async with aiohttp.ClientSession() as session:
async with await session.post(self.api_url,
json=search_obj,
timeout=aiohttp.ClientTimeout(connect=5, sock_read=10)) as request:
request.raise_for_status()
response: dict = await request.json()
if response.get('err'):
return [f"ERR: {response.get('errorText')}"]
out_lyrics = regex.sub(r'<br>', '\u200B\n', response.get('lyrics'))
response_obj: dict = {
'artist': response.get('artist'),
'song': response.get('song'),
'lyrics': out_lyrics,
'src': response.get('src'),
'confidence': float(response.get('confidence')),
'time': float(response.get('time')),
}
lyrics = response_obj.get('lyrics')
response_obj['lyrics'] = textwrap.wrap(text=lyrics.strip(),
width=4000, drop_whitespace=False,
replace_whitespace=False, break_long_words=True,
break_on_hyphens=True, max_lines=8)
response_obj['lyrics_short'] = textwrap.wrap(text=lyrics.strip(),
width=750, drop_whitespace=False,
replace_whitespace=False, break_long_words=True,
break_on_hyphens=True, max_lines=1)
return [
(
response_obj.get('artist'), response_obj.get('song'), response_obj.get('src'),
f"{int(response_obj.get('confidence'))}%",
f"{response_obj.get('time', -666.0):.4f}s",
),
response_obj.get('lyrics'),
response_obj.get('lyrics_short'),
]
except Exception as e:
traceback.print_exc()
return [f"Retrieval failed: {str(e)}"]