feat: refactor Radio cog and update dependencies

feat: update lyric commands to utilize discord's Components v2
This commit is contained in:
2025-10-08 15:45:38 -04:00
parent 80cc3dc1e8
commit 190eb8acd2
4 changed files with 455 additions and 63 deletions

377
cogs/quote.py Normal file
View File

@@ -0,0 +1,377 @@
#!/usr/bin/env python3.12
# pylint: disable=bare-except, broad-exception-caught
"""
Quote cog for Havoc
"""
import traceback
import time
import os
import datetime
from typing import Optional
import asyncio
import discord
import aiosqlite as sqlite3
from discord.ext import bridge, commands
from disc_havoc import Havoc
class DB:
"""DB Utility for Quote Cog"""
def __init__(self, bot: Havoc):
self.bot: Havoc = bot
self.db_path = os.path.join(
"/", "usr", "local", "share", "sqlite_dbs", "quotes.db"
)
self.hp_chanid = 1157529874936909934
async def get_quote_count(self):
"""Get Quote Count"""
async with sqlite3.connect(self.db_path, timeout=2) as db_conn:
async with await db_conn.execute(
"SELECT COUNT (*) FROM quotes"
) as db_cursor:
result = await db_cursor.fetchone()
return result[-1]
async def remove_quote(self, quote_id: int):
"""Remove Quote from DB"""
try:
async with sqlite3.connect(self.db_path, timeout=2) as db_conn:
async with await db_conn.execute(
"DELETE FROM quotes WHERE id = ?", (quote_id,)
) as _:
await db_conn.commit()
return True
except Exception as e: # noqa
_channel = self.bot.get_channel(self.hp_chanid)
if isinstance(_channel, discord.TextChannel):
await _channel.send(traceback.format_exc())
return False
async def add_quote(
self,
message_id: int,
channel_id: int,
quoted_member_id: int,
message_time: int,
quoter_friendly: str,
quoted_friendly: str,
channel_friendly: str,
message_content: str,
):
"""Add Quote to DB"""
params = (
quoter_friendly,
int(time.time()),
quoted_friendly,
quoted_member_id,
channel_friendly,
channel_id,
message_id,
message_time,
quoter_friendly,
message_content,
)
try:
async with sqlite3.connect(self.db_path, timeout=2) as db_conn:
# pylint: disable=line-too-long
db_conn.row_factory = sqlite3.Row
async with await db_conn.execute(
"INSERT INTO quotes (added_by, added_at, quoted_user_display, quoted_user_memberid, quoted_channel_display, quoted_channel_id, quoted_message_id, quoted_message_time, added_by_friendly, quoted_message) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
params,
) as _: # pylint: enable=line-too-long
await db_conn.commit()
return True
except Exception as e: # noqa
return traceback.format_exc()
async def fetch_quote(
self,
random: bool = False,
quoteid: Optional[int] = None,
added_by: Optional[str] = None,
quoted_user: Optional[str] = None,
content: Optional[str] = None,
):
"""Fetch Quote from DB"""
try:
query_head = "SELECT id, added_by_friendly, added_at, quoted_user_display, quoted_channel_display, quoted_message_time, quoted_message FROM quotes"
query = ""
params: Optional[tuple] = None
if random:
query = f"{query_head} ORDER BY RANDOM() LIMIT 1"
elif quoteid:
query = f"{query_head} WHERE id = ? LIMIT 1"
params = (quoteid,)
elif added_by:
query = f"{query_head} WHERE added_by_friendly LIKE ? ORDER BY RANDOM() LIMIT 5"
params = (f"%{added_by}%",)
elif quoted_user:
query = f"{query_head} WHERE quoted_user_display LIKE ? ORDER BY RANDOM() LIMIT 5"
params = (f"%{quoted_user}%",)
elif content:
query = f"{query_head} WHERE quoted_message LIKE ? ORDER BY RANDOM() LIMIT 5"
params = (f"%{content}%",)
async with sqlite3.connect(self.db_path, timeout=2) as db_conn:
db_conn.row_factory = sqlite3.Row
async with await db_conn.execute(query, params) as db_cursor:
results = await db_cursor.fetchall()
if not results:
return {
"err": "No results for query",
}
if random or quoteid:
chosen = results[-1]
return {str(k): v for k, v in chosen.items()}
else:
return [{str(k): v for k, v in _.items()} for _ in results]
except Exception as e: # noqa
return traceback.format_exc()
class Quote(commands.Cog):
"""Quote Cog for Havoc"""
def __init__(self, bot: Havoc):
self.bot: Havoc = bot
self.db = DB(self.bot)
def is_homeserver(): # type: ignore
"""Check if channel/interaction is within homeserver"""
def predicate(ctx):
try:
return ctx.guild.id == 1145182936002482196
except Exception as e: # noqa
traceback.print_exc()
return False
return commands.check(predicate)
@commands.message_command(name="Add Quote")
async def add_quote(self, ctx, message: discord.Message):
"""Add A Quote"""
hp_chanid = 1157529874936909934
try:
if message.author.bot:
return await ctx.respond(
"Quotes are for real users, not bots.", ephemeral=True
)
quoter_friendly = ctx.author.display_name
quoted_message_id = message.id
quoted_channel_friendly = f"#{ctx.channel.name}"
quoted_channel_id = ctx.channel.id
message_content = message.content
message_author_friendly = message.author.display_name
message_author_id = message.author.id
message_time = int(message.created_at.timestamp())
message_escaped = discord.utils.escape_mentions(
discord.utils.escape_markdown(message_content)
).strip()
if len(message_escaped) < 3:
return await ctx.respond(
"**Error**: Message (text content) is not long enough to quote.",
ephemeral=True,
)
if len(message_escaped) > 512:
return await ctx.respond(
"**Error**: Message (text content) is too long to quote.",
ephemeral=True,
)
result = await self.db.add_quote(
message_id=quoted_message_id,
channel_id=quoted_channel_id,
quoted_member_id=message_author_id,
message_time=message_time,
quoter_friendly=quoter_friendly,
quoted_friendly=message_author_friendly,
channel_friendly=quoted_channel_friendly,
message_content=message_content,
)
if not result:
return await ctx.respond("Failed!", ephemeral=True)
else:
return await ctx.respond("OK!", ephemeral=True)
except Exception as e: # noqa
_channel = self.bot.get_channel(hp_chanid)
if not isinstance(_channel, discord.TextChannel):
return
await _channel.send(traceback.format_exc())
@bridge.bridge_command(aliases=["rand"])
@is_homeserver() # pylint: disable=too-many-function-args
async def randquote(self, ctx):
"""Get a random quote"""
try:
random_quote = await self.db.fetch_quote(random=True)
if random_quote.get("err"):
return await ctx.respond("Failed to get a quote")
quote_id = random_quote.get("id")
quoted_friendly = random_quote.get("quoted_user_display", "Unknown")
adder_friendly = random_quote.get("added_by_friendly", "Unknown")
message_time = datetime.datetime.fromtimestamp(
random_quote.get("quoted_message_time")
)
message_channel = random_quote.get("quoted_channel_display")
quote_added_at = datetime.datetime.fromtimestamp(
random_quote.get("added_at")
)
quote_content = random_quote.get("quoted_message")
embed = discord.Embed(
colour=discord.Colour.orange(),
title=f"Quote #{quote_id}",
)
embed.description = f"**{quoted_friendly}:** {quote_content}"
embed.add_field(name="Original Message Time", value=message_time)
embed.add_field(name="Channel", value=message_channel)
embed.add_field(name="Quote ID", value=quote_id)
embed.footer = discord.EmbedFooter(
text=f"Added by {adder_friendly} {quote_added_at}"
)
return await ctx.respond(embed=embed)
except Exception as e: # noqa
error = await ctx.respond(traceback.format_exc())
await asyncio.sleep(10)
await error.delete()
@bridge.bridge_command(aliases=["qg"])
@is_homeserver() # pylint: disable=too-many-function-args
async def quoteget(self, ctx, quoteid):
"""Get a specific quote by ID"""
try:
if not str(quoteid).strip().isnumeric():
return await ctx.respond("**Error**: Quote ID must be numeric.")
fetched_quote = await self.db.fetch_quote(quoteid=quoteid)
if fetched_quote.get("err"):
return await ctx.respond("**Error**: Quote not found")
quote_id = fetched_quote.get("id")
quoted_friendly = fetched_quote.get("quoted_user_display", "Unknown")
adder_friendly = fetched_quote.get("added_by_friendly", "Unknown")
message_time = datetime.datetime.fromtimestamp(
fetched_quote.get("quoted_message_time")
)
message_channel = fetched_quote.get("quoted_channel_display")
quote_added_at = datetime.datetime.fromtimestamp(
fetched_quote.get("added_at")
)
quote_content = fetched_quote.get("quoted_message")
embed = discord.Embed(
colour=discord.Colour.orange(),
title=f"Quote #{quote_id}",
)
embed.description = f"**{quoted_friendly}:** {quote_content}"
embed.add_field(name="Original Message Time", value=message_time)
embed.add_field(name="Channel", value=message_channel)
embed.add_field(name="Quote ID", value=quote_id)
embed.footer = discord.EmbedFooter(
text=f"Added by {adder_friendly} {quote_added_at}"
)
return await ctx.respond(embed=embed)
except Exception as e: # noqa
error = await ctx.respond(traceback.format_exc())
await asyncio.sleep(10)
await error.delete()
@bridge.bridge_command(aliases=["qs"])
@is_homeserver() # pylint: disable=too-many-function-args
async def quotesearch(self, ctx, *, content: str):
"""Search for a quote (by content)"""
try:
found_quotes = await self.db.fetch_quote(content=content)
if isinstance(found_quotes, dict) and found_quotes.get("err"):
return await ctx.respond(
f"Quote search failed: {found_quotes.get('err')}"
)
embeds = []
for quote in found_quotes:
quote_id = quote.get("id")
quoted_friendly = quote.get("quoted_user_display", "Unknown")
adder_friendly = quote.get("added_by_friendly", "Unknown")
message_time = datetime.datetime.fromtimestamp(
quote.get("quoted_message_time")
)
message_channel = quote.get("quoted_channel_display")
quote_added_at = datetime.datetime.fromtimestamp(quote.get("added_at"))
quote_content = quote.get("quoted_message")
# await ctx.respond(f"**{quoted_friendly}**: {quote}")ed_friendly = quote.get('quoted_user_display', 'Unknown')
adder_friendly = quote.get("added_by_friendly", "Unknown")
message_time = datetime.datetime.fromtimestamp(
quote.get("quoted_message_time")
)
message_channel = quote.get("quoted_channel_display")
quote_added_at = datetime.datetime.fromtimestamp(quote.get("added_at"))
quote = quote.get("quoted_message")
# await ctx.respond(f"**{quoted_friendly}**: {quote}")
embed = discord.Embed(
colour=discord.Colour.orange(),
title=f"Quote #{quote_id}",
)
embed.description = f"**{quoted_friendly}:** {quote_content}"
embed.add_field(name="Original Message Time", value=str(message_time))
embed.add_field(name="Channel", value=message_channel)
embed.add_field(name="Quote ID", value=quote_id)
embed.footer = discord.EmbedFooter(
text=f"Added by {adder_friendly} {quote_added_at}"
)
embeds.append(embed)
return await ctx.respond(embeds=embeds)
except Exception as e:
await ctx.respond(f"Error: {type(e).__name__} - {str(e)}")
@bridge.bridge_command(aliases=["nq"])
@is_homeserver() # pylint: disable=too-many-function-args
async def nquotes(self, ctx):
"""Get # of quotes stored"""
try:
quote_count = await self.db.get_quote_count()
if not quote_count:
return await ctx.respond("**Error**: No quotes found!")
return await ctx.respond(
f"I currently have **{quote_count}** quotes stored."
)
except Exception as e:
await ctx.respond(f"Error: {type(e).__name__} - {str(e)}")
@bridge.bridge_command(aliases=["qr"])
@commands.is_owner()
@is_homeserver() # pylint: disable=too-many-function-args
async def quoteremove(self, ctx, quoteid):
"""Remove a quote (by id)
Owner only"""
try:
if not str(quoteid).strip().isnumeric():
return await ctx.respond("**Error**: Quote ID must be numeric.")
quoteid = int(quoteid)
remove_quote = await self.db.remove_quote(quoteid)
if not remove_quote:
return await ctx.respond("**Error**: Failed!", ephemeral=True)
return await ctx.respond("Removed!", ephemeral=True)
except Exception as e:
await ctx.respond(f"Error: {type(e).__name__} - {str(e)}")
def setup(bot):
"""Run on Cog Load"""
bot.add_cog(Quote(bot))

View File

@@ -1,5 +1,6 @@
import logging
import traceback
import asyncio
from typing import Optional
from discord.ext import bridge, commands, tasks
from util.radio_util import get_now_playing, skip
@@ -43,78 +44,86 @@ class Radio(commands.Cog):
return commands.check(predicate)
@bridge.bridge_command()
@commands.is_owner()
async def reinitradio(self, ctx) -> None:
"""
Reinitialize serious.FM
"""
loop: discord.asyncio.AbstractEventLoop = self.bot.loop
loop.create_task(self.radio_init())
await ctx.respond("Done!", ephemeral=True)
# @bridge.bridge_command()
# @commands.is_owner()
# async def reinitradio(self, ctx) -> None:
# """
# Reinitialize serious.FM
# """
# loop: discord.asyncio.AbstractEventLoop = self.bot.loop
# loop.create_task(self.radio_init())
# await ctx.respond("Done!", ephemeral=True)
# async def radio_init(self) -> None:
# """Init Radio"""
# try:
# (radio_guild, radio_chan) = self.channels["sfm"]
# guild: Optional[discord.Guild] = self.bot.get_guild(radio_guild)
# if not guild:
# return
# channel = guild.get_channel(radio_chan)
# if not isinstance(channel, discord.VoiceChannel):
# return
# if not self.bot.voice_clients:
# await channel.connect()
# try:
# try:
# self.radio_state_loop.cancel()
# except Exception as e:
# logging.debug("Failed to cancel radio_state_loop: %s", str(e))
# self.radio_state_loop.start()
# logging.info("radio_state_loop task started!")
# except Exception as e:
# logging.critical("Could not start task... Exception: %s", str(e))
# traceback.print_exc()
# except Exception as e:
# logging.debug("Exception: %s", str(e))
# traceback.print_exc()
# return
async def radio_init(self) -> None:
"""Init Radio"""
try:
(radio_guild, radio_chan) = self.channels["sfm"]
guild: Optional[discord.Guild] = self.bot.get_guild(radio_guild)
if not guild:
return
channel = guild.get_channel(radio_chan)
if not isinstance(channel, discord.VoiceChannel):
return
if not self.bot.voice_clients:
await channel.connect()
try:
try:
self.radio_state_loop.cancel()
except Exception as e:
logging.debug("Failed to cancel radio_state_loop: %s", str(e))
self.radio_state_loop.start()
logging.info("radio_state_loop task started!")
except Exception as e:
logging.critical("Could not start task... Exception: %s", str(e))
traceback.print_exc()
self.radio_state_loop.start()
except Exception as e:
logging.debug("Exception: %s", str(e))
logging.critical("Failed to start radio state loop: %s", str(e))
traceback.print_exc()
return
@tasks.loop(seconds=5.0)
async def radio_state_loop(self) -> None:
"""Radio State Loop"""
try:
(radio_guild, radio_chan) = self.channels["sfm"]
try:
vc: discord.VoiceProtocol = self.bot.voice_clients[-1]
except Exception as e:
logging.debug(
"No voice client, establishing new VC connection... (Exception: %s)",
str(e),
)
guild: Optional[discord.Guild] = self.bot.get_guild(radio_guild)
if not guild:
return
channel = guild.get_channel(radio_chan)
if not isinstance(channel, discord.VoiceChannel):
return
await channel.connect()
vc = self.bot.voice_clients[-1]
# (radio_guild, radio_chan) = self.channels["sfm"]
# try:
# vc: discord.VoiceProtocol = self.bot.voice_clients[-1]
# except Exception as e:
# logging.debug(
# "No voice client, establishing new VC connection... (Exception: %s)",
# str(e),
# )
# guild: Optional[discord.Guild] = self.bot.get_guild(radio_guild)
# if not guild:
# return
# channel = guild.get_channel(radio_chan)
# if not isinstance(channel, discord.VoiceChannel):
# return
# await channel.connect()
# vc = self.bot.voice_clients[-1]
if not vc.is_playing() or vc.is_paused(): # type: ignore
"""
Mypy does not seem aware of the is_playing, play, and is_paused methods,
but they exist.
"""
logging.info("Detected VC not playing... playing!")
source: discord.FFmpegAudio = discord.FFmpegOpusAudio(self.STREAM_URL)
vc.play( # type: ignore
source,
after=lambda e: logging.info("Error: %s", e) if e else None,
)
# if not vc.is_playing() or vc.is_paused(): # type: ignore
# """
# Mypy does not seem aware of the is_playing, play, and is_paused methods,
# but they exist.
# """
# logging.info("Detected VC not playing... playing!")
# source: discord.FFmpegAudio = discord.FFmpegOpusAudio(self.STREAM_URL)
# vc.play( # type: ignore
# source,
# after=lambda e: logging.info("Error: %s", e) if e else None,
# )
# Get Now Playing
np_track: Optional[str] = await get_now_playing()
if np_track and not self.LAST_NP_TRACK == np_track:
if not self.LAST_NP_TRACK or (np_track and not self.LAST_NP_TRACK == np_track):
logging.critical("Setting: %s", np_track)
self.LAST_NP_TRACK = np_track
await self.bot.change_presence(
activity=discord.Activity(

View File

@@ -25,10 +25,11 @@ cogs_list: list[str] = [
"sing",
"meme",
"lovehate",
"radio",
"ollama",
# "radio",
]
bot_activity = discord.CustomActivity(name="I made cookies!")
bot_activity = discord.CustomActivity(name="I LIKE TURTLES")
load_dotenv()
@@ -42,7 +43,7 @@ class Havoc(bridge.Bot):
command_prefix=".",
intents=intents,
owner_ids=OWNERS,
activity=bot_activity,
activity=None,
help_command=commands.MinimalHelpCommand(),
)
self.BOT_CHANIDS = BOT_CHANIDS
@@ -83,6 +84,7 @@ class Havoc(bridge.Bot):
@commands.Cog.listener()
async def on_ready(self) -> None:
"""Run on Bot Ready"""
await self.change_presence(activity=None)
logging.info("%s online!", self.user)

View File

@@ -25,7 +25,11 @@ async def get_now_playing() -> Optional[str]:
) as request:
request.raise_for_status()
response_json = await request.json()
artistsong = response_json.get("artistsong")
artistsong: str = "N/A - N/A"
artist: Optional[str] = response_json.get("artist")
song: Optional[str] = response_json.get("song")
if artist and song:
artistsong = f"{artist} - {song}"
return artistsong
except Exception as e:
logging.critical("Now playing retrieval failed: %s", str(e))