Compare commits

...

3 Commits

Author SHA1 Message Date
a6e1bf29de bugfix: Halloween command
12/25 isn't Christmas, apparently.  Who knew?
2025-10-31 07:25:03 -04:00
5d881369e9 . 2025-10-08 15:45:47 -04:00
190eb8acd2 feat: refactor Radio cog and update dependencies
feat: update lyric commands to utilize discord's Components v2
2025-10-08 15:45:38 -04:00
8 changed files with 2378 additions and 162 deletions

View File

@@ -248,7 +248,7 @@ class Misc(commands.Cog):
) # Invalid countdown from util ) # Invalid countdown from util
(days, hours, minutes, seconds, ms, _) = countdown (days, hours, minutes, seconds, ms, _) = countdown
now: datetime.datetime = datetime.datetime.now() now: datetime.datetime = datetime.datetime.now()
if now.month == 12 and now.day == 25: if now.month == 10 and now.day == 31:
return await ctx.respond("# IT IS HALLOWEEN!!!!!!!!\n-# GET ROWDY") return await ctx.respond("# IT IS HALLOWEEN!!!!!!!!\n-# GET ROWDY")
halloween_trans: dict = str.maketrans(emojis) halloween_trans: dict = str.maketrans(emojis)
try: try:

377
cogs/quote.py Normal file
View File

@@ -0,0 +1,377 @@
#!/usr/bin/env python3.12
# pylint: disable=bare-except, broad-exception-caught
"""
Quote cog for Havoc
"""
import traceback
import time
import os
import datetime
from typing import Optional
import asyncio
import discord
import aiosqlite as sqlite3
from discord.ext import bridge, commands
from disc_havoc import Havoc
class DB:
"""DB Utility for Quote Cog"""
def __init__(self, bot: Havoc):
self.bot: Havoc = bot
self.db_path = os.path.join(
"/", "usr", "local", "share", "sqlite_dbs", "quotes.db"
)
self.hp_chanid = 1157529874936909934
async def get_quote_count(self):
"""Get Quote Count"""
async with sqlite3.connect(self.db_path, timeout=2) as db_conn:
async with await db_conn.execute(
"SELECT COUNT (*) FROM quotes"
) as db_cursor:
result = await db_cursor.fetchone()
return result[-1]
async def remove_quote(self, quote_id: int):
"""Remove Quote from DB"""
try:
async with sqlite3.connect(self.db_path, timeout=2) as db_conn:
async with await db_conn.execute(
"DELETE FROM quotes WHERE id = ?", (quote_id,)
) as _:
await db_conn.commit()
return True
except Exception as e: # noqa
_channel = self.bot.get_channel(self.hp_chanid)
if isinstance(_channel, discord.TextChannel):
await _channel.send(traceback.format_exc())
return False
async def add_quote(
self,
message_id: int,
channel_id: int,
quoted_member_id: int,
message_time: int,
quoter_friendly: str,
quoted_friendly: str,
channel_friendly: str,
message_content: str,
):
"""Add Quote to DB"""
params = (
quoter_friendly,
int(time.time()),
quoted_friendly,
quoted_member_id,
channel_friendly,
channel_id,
message_id,
message_time,
quoter_friendly,
message_content,
)
try:
async with sqlite3.connect(self.db_path, timeout=2) as db_conn:
# pylint: disable=line-too-long
db_conn.row_factory = sqlite3.Row
async with await db_conn.execute(
"INSERT INTO quotes (added_by, added_at, quoted_user_display, quoted_user_memberid, quoted_channel_display, quoted_channel_id, quoted_message_id, quoted_message_time, added_by_friendly, quoted_message) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
params,
) as _: # pylint: enable=line-too-long
await db_conn.commit()
return True
except Exception as e: # noqa
return traceback.format_exc()
async def fetch_quote(
self,
random: bool = False,
quoteid: Optional[int] = None,
added_by: Optional[str] = None,
quoted_user: Optional[str] = None,
content: Optional[str] = None,
):
"""Fetch Quote from DB"""
try:
query_head = "SELECT id, added_by_friendly, added_at, quoted_user_display, quoted_channel_display, quoted_message_time, quoted_message FROM quotes"
query = ""
params: Optional[tuple] = None
if random:
query = f"{query_head} ORDER BY RANDOM() LIMIT 1"
elif quoteid:
query = f"{query_head} WHERE id = ? LIMIT 1"
params = (quoteid,)
elif added_by:
query = f"{query_head} WHERE added_by_friendly LIKE ? ORDER BY RANDOM() LIMIT 5"
params = (f"%{added_by}%",)
elif quoted_user:
query = f"{query_head} WHERE quoted_user_display LIKE ? ORDER BY RANDOM() LIMIT 5"
params = (f"%{quoted_user}%",)
elif content:
query = f"{query_head} WHERE quoted_message LIKE ? ORDER BY RANDOM() LIMIT 5"
params = (f"%{content}%",)
async with sqlite3.connect(self.db_path, timeout=2) as db_conn:
db_conn.row_factory = sqlite3.Row
async with await db_conn.execute(query, params) as db_cursor:
results = await db_cursor.fetchall()
if not results:
return {
"err": "No results for query",
}
if random or quoteid:
chosen = results[-1]
return {str(k): v for k, v in chosen.items()}
else:
return [{str(k): v for k, v in _.items()} for _ in results]
except Exception as e: # noqa
return traceback.format_exc()
class Quote(commands.Cog):
"""Quote Cog for Havoc"""
def __init__(self, bot: Havoc):
self.bot: Havoc = bot
self.db = DB(self.bot)
def is_homeserver(): # type: ignore
"""Check if channel/interaction is within homeserver"""
def predicate(ctx):
try:
return ctx.guild.id == 1145182936002482196
except Exception as e: # noqa
traceback.print_exc()
return False
return commands.check(predicate)
@commands.message_command(name="Add Quote")
async def add_quote(self, ctx, message: discord.Message):
"""Add A Quote"""
hp_chanid = 1157529874936909934
try:
if message.author.bot:
return await ctx.respond(
"Quotes are for real users, not bots.", ephemeral=True
)
quoter_friendly = ctx.author.display_name
quoted_message_id = message.id
quoted_channel_friendly = f"#{ctx.channel.name}"
quoted_channel_id = ctx.channel.id
message_content = message.content
message_author_friendly = message.author.display_name
message_author_id = message.author.id
message_time = int(message.created_at.timestamp())
message_escaped = discord.utils.escape_mentions(
discord.utils.escape_markdown(message_content)
).strip()
if len(message_escaped) < 3:
return await ctx.respond(
"**Error**: Message (text content) is not long enough to quote.",
ephemeral=True,
)
if len(message_escaped) > 512:
return await ctx.respond(
"**Error**: Message (text content) is too long to quote.",
ephemeral=True,
)
result = await self.db.add_quote(
message_id=quoted_message_id,
channel_id=quoted_channel_id,
quoted_member_id=message_author_id,
message_time=message_time,
quoter_friendly=quoter_friendly,
quoted_friendly=message_author_friendly,
channel_friendly=quoted_channel_friendly,
message_content=message_content,
)
if not result:
return await ctx.respond("Failed!", ephemeral=True)
else:
return await ctx.respond("OK!", ephemeral=True)
except Exception as e: # noqa
_channel = self.bot.get_channel(hp_chanid)
if not isinstance(_channel, discord.TextChannel):
return
await _channel.send(traceback.format_exc())
@bridge.bridge_command(aliases=["rand"])
@is_homeserver() # pylint: disable=too-many-function-args
async def randquote(self, ctx):
"""Get a random quote"""
try:
random_quote = await self.db.fetch_quote(random=True)
if random_quote.get("err"):
return await ctx.respond("Failed to get a quote")
quote_id = random_quote.get("id")
quoted_friendly = random_quote.get("quoted_user_display", "Unknown")
adder_friendly = random_quote.get("added_by_friendly", "Unknown")
message_time = datetime.datetime.fromtimestamp(
random_quote.get("quoted_message_time")
)
message_channel = random_quote.get("quoted_channel_display")
quote_added_at = datetime.datetime.fromtimestamp(
random_quote.get("added_at")
)
quote_content = random_quote.get("quoted_message")
embed = discord.Embed(
colour=discord.Colour.orange(),
title=f"Quote #{quote_id}",
)
embed.description = f"**{quoted_friendly}:** {quote_content}"
embed.add_field(name="Original Message Time", value=message_time)
embed.add_field(name="Channel", value=message_channel)
embed.add_field(name="Quote ID", value=quote_id)
embed.footer = discord.EmbedFooter(
text=f"Added by {adder_friendly} {quote_added_at}"
)
return await ctx.respond(embed=embed)
except Exception as e: # noqa
error = await ctx.respond(traceback.format_exc())
await asyncio.sleep(10)
await error.delete()
@bridge.bridge_command(aliases=["qg"])
@is_homeserver() # pylint: disable=too-many-function-args
async def quoteget(self, ctx, quoteid):
"""Get a specific quote by ID"""
try:
if not str(quoteid).strip().isnumeric():
return await ctx.respond("**Error**: Quote ID must be numeric.")
fetched_quote = await self.db.fetch_quote(quoteid=quoteid)
if fetched_quote.get("err"):
return await ctx.respond("**Error**: Quote not found")
quote_id = fetched_quote.get("id")
quoted_friendly = fetched_quote.get("quoted_user_display", "Unknown")
adder_friendly = fetched_quote.get("added_by_friendly", "Unknown")
message_time = datetime.datetime.fromtimestamp(
fetched_quote.get("quoted_message_time")
)
message_channel = fetched_quote.get("quoted_channel_display")
quote_added_at = datetime.datetime.fromtimestamp(
fetched_quote.get("added_at")
)
quote_content = fetched_quote.get("quoted_message")
embed = discord.Embed(
colour=discord.Colour.orange(),
title=f"Quote #{quote_id}",
)
embed.description = f"**{quoted_friendly}:** {quote_content}"
embed.add_field(name="Original Message Time", value=message_time)
embed.add_field(name="Channel", value=message_channel)
embed.add_field(name="Quote ID", value=quote_id)
embed.footer = discord.EmbedFooter(
text=f"Added by {adder_friendly} {quote_added_at}"
)
return await ctx.respond(embed=embed)
except Exception as e: # noqa
error = await ctx.respond(traceback.format_exc())
await asyncio.sleep(10)
await error.delete()
@bridge.bridge_command(aliases=["qs"])
@is_homeserver() # pylint: disable=too-many-function-args
async def quotesearch(self, ctx, *, content: str):
"""Search for a quote (by content)"""
try:
found_quotes = await self.db.fetch_quote(content=content)
if isinstance(found_quotes, dict) and found_quotes.get("err"):
return await ctx.respond(
f"Quote search failed: {found_quotes.get('err')}"
)
embeds = []
for quote in found_quotes:
quote_id = quote.get("id")
quoted_friendly = quote.get("quoted_user_display", "Unknown")
adder_friendly = quote.get("added_by_friendly", "Unknown")
message_time = datetime.datetime.fromtimestamp(
quote.get("quoted_message_time")
)
message_channel = quote.get("quoted_channel_display")
quote_added_at = datetime.datetime.fromtimestamp(quote.get("added_at"))
quote_content = quote.get("quoted_message")
# await ctx.respond(f"**{quoted_friendly}**: {quote}")ed_friendly = quote.get('quoted_user_display', 'Unknown')
adder_friendly = quote.get("added_by_friendly", "Unknown")
message_time = datetime.datetime.fromtimestamp(
quote.get("quoted_message_time")
)
message_channel = quote.get("quoted_channel_display")
quote_added_at = datetime.datetime.fromtimestamp(quote.get("added_at"))
quote = quote.get("quoted_message")
# await ctx.respond(f"**{quoted_friendly}**: {quote}")
embed = discord.Embed(
colour=discord.Colour.orange(),
title=f"Quote #{quote_id}",
)
embed.description = f"**{quoted_friendly}:** {quote_content}"
embed.add_field(name="Original Message Time", value=str(message_time))
embed.add_field(name="Channel", value=message_channel)
embed.add_field(name="Quote ID", value=quote_id)
embed.footer = discord.EmbedFooter(
text=f"Added by {adder_friendly} {quote_added_at}"
)
embeds.append(embed)
return await ctx.respond(embeds=embeds)
except Exception as e:
await ctx.respond(f"Error: {type(e).__name__} - {str(e)}")
@bridge.bridge_command(aliases=["nq"])
@is_homeserver() # pylint: disable=too-many-function-args
async def nquotes(self, ctx):
"""Get # of quotes stored"""
try:
quote_count = await self.db.get_quote_count()
if not quote_count:
return await ctx.respond("**Error**: No quotes found!")
return await ctx.respond(
f"I currently have **{quote_count}** quotes stored."
)
except Exception as e:
await ctx.respond(f"Error: {type(e).__name__} - {str(e)}")
@bridge.bridge_command(aliases=["qr"])
@commands.is_owner()
@is_homeserver() # pylint: disable=too-many-function-args
async def quoteremove(self, ctx, quoteid):
"""Remove a quote (by id)
Owner only"""
try:
if not str(quoteid).strip().isnumeric():
return await ctx.respond("**Error**: Quote ID must be numeric.")
quoteid = int(quoteid)
remove_quote = await self.db.remove_quote(quoteid)
if not remove_quote:
return await ctx.respond("**Error**: Failed!", ephemeral=True)
return await ctx.respond("Removed!", ephemeral=True)
except Exception as e:
await ctx.respond(f"Error: {type(e).__name__} - {str(e)}")
def setup(bot):
"""Run on Cog Load"""
bot.add_cog(Quote(bot))

View File

@@ -1,5 +1,6 @@
import logging import logging
import traceback import traceback
import asyncio
from typing import Optional from typing import Optional
from discord.ext import bridge, commands, tasks from discord.ext import bridge, commands, tasks
from util.radio_util import get_now_playing, skip from util.radio_util import get_now_playing, skip
@@ -43,78 +44,86 @@ class Radio(commands.Cog):
return commands.check(predicate) return commands.check(predicate)
@bridge.bridge_command() # @bridge.bridge_command()
@commands.is_owner() # @commands.is_owner()
async def reinitradio(self, ctx) -> None: # async def reinitradio(self, ctx) -> None:
""" # """
Reinitialize serious.FM # Reinitialize serious.FM
""" # """
loop: discord.asyncio.AbstractEventLoop = self.bot.loop # loop: discord.asyncio.AbstractEventLoop = self.bot.loop
loop.create_task(self.radio_init()) # loop.create_task(self.radio_init())
await ctx.respond("Done!", ephemeral=True) # await ctx.respond("Done!", ephemeral=True)
# async def radio_init(self) -> None:
# """Init Radio"""
# try:
# (radio_guild, radio_chan) = self.channels["sfm"]
# guild: Optional[discord.Guild] = self.bot.get_guild(radio_guild)
# if not guild:
# return
# channel = guild.get_channel(radio_chan)
# if not isinstance(channel, discord.VoiceChannel):
# return
# if not self.bot.voice_clients:
# await channel.connect()
# try:
# try:
# self.radio_state_loop.cancel()
# except Exception as e:
# logging.debug("Failed to cancel radio_state_loop: %s", str(e))
# self.radio_state_loop.start()
# logging.info("radio_state_loop task started!")
# except Exception as e:
# logging.critical("Could not start task... Exception: %s", str(e))
# traceback.print_exc()
# except Exception as e:
# logging.debug("Exception: %s", str(e))
# traceback.print_exc()
# return
async def radio_init(self) -> None: async def radio_init(self) -> None:
"""Init Radio"""
try: try:
(radio_guild, radio_chan) = self.channels["sfm"] self.radio_state_loop.start()
guild: Optional[discord.Guild] = self.bot.get_guild(radio_guild)
if not guild:
return
channel = guild.get_channel(radio_chan)
if not isinstance(channel, discord.VoiceChannel):
return
if not self.bot.voice_clients:
await channel.connect()
try:
try:
self.radio_state_loop.cancel()
except Exception as e:
logging.debug("Failed to cancel radio_state_loop: %s", str(e))
self.radio_state_loop.start()
logging.info("radio_state_loop task started!")
except Exception as e:
logging.critical("Could not start task... Exception: %s", str(e))
traceback.print_exc()
except Exception as e: except Exception as e:
logging.debug("Exception: %s", str(e)) logging.critical("Failed to start radio state loop: %s", str(e))
traceback.print_exc() traceback.print_exc()
return
@tasks.loop(seconds=5.0) @tasks.loop(seconds=5.0)
async def radio_state_loop(self) -> None: async def radio_state_loop(self) -> None:
"""Radio State Loop""" """Radio State Loop"""
try: try:
(radio_guild, radio_chan) = self.channels["sfm"] # (radio_guild, radio_chan) = self.channels["sfm"]
try: # try:
vc: discord.VoiceProtocol = self.bot.voice_clients[-1] # vc: discord.VoiceProtocol = self.bot.voice_clients[-1]
except Exception as e: # except Exception as e:
logging.debug( # logging.debug(
"No voice client, establishing new VC connection... (Exception: %s)", # "No voice client, establishing new VC connection... (Exception: %s)",
str(e), # str(e),
) # )
guild: Optional[discord.Guild] = self.bot.get_guild(radio_guild) # guild: Optional[discord.Guild] = self.bot.get_guild(radio_guild)
if not guild: # if not guild:
return # return
channel = guild.get_channel(radio_chan) # channel = guild.get_channel(radio_chan)
if not isinstance(channel, discord.VoiceChannel): # if not isinstance(channel, discord.VoiceChannel):
return # return
await channel.connect() # await channel.connect()
vc = self.bot.voice_clients[-1] # vc = self.bot.voice_clients[-1]
if not vc.is_playing() or vc.is_paused(): # type: ignore # if not vc.is_playing() or vc.is_paused(): # type: ignore
""" # """
Mypy does not seem aware of the is_playing, play, and is_paused methods, # Mypy does not seem aware of the is_playing, play, and is_paused methods,
but they exist. # but they exist.
""" # """
logging.info("Detected VC not playing... playing!") # logging.info("Detected VC not playing... playing!")
source: discord.FFmpegAudio = discord.FFmpegOpusAudio(self.STREAM_URL) # source: discord.FFmpegAudio = discord.FFmpegOpusAudio(self.STREAM_URL)
vc.play( # type: ignore # vc.play( # type: ignore
source, # source,
after=lambda e: logging.info("Error: %s", e) if e else None, # after=lambda e: logging.info("Error: %s", e) if e else None,
) # )
# Get Now Playing # Get Now Playing
np_track: Optional[str] = await get_now_playing() np_track: Optional[str] = await get_now_playing()
if np_track and not self.LAST_NP_TRACK == np_track: if not self.LAST_NP_TRACK or (np_track and not self.LAST_NP_TRACK == np_track):
logging.critical("Setting: %s", np_track)
self.LAST_NP_TRACK = np_track self.LAST_NP_TRACK = np_track
await self.bot.change_presence( await self.bot.change_presence(
activity=discord.Activity( activity=discord.Activity(

View File

@@ -7,9 +7,137 @@ import regex
from util.sing_util import Utility from util.sing_util import Utility
from discord.ext import bridge, commands from discord.ext import bridge, commands
from disc_havoc import Havoc from disc_havoc import Havoc
import random
import string
import json
import os
BOT_CHANIDS = [] BOT_CHANIDS = []
# Global storage for pagination data (survives across interactions)
PAGINATION_DATA = {}
PAGINATION_FILE = "pagination_data.json"
def load_pagination_data():
"""Load pagination data from JSON file"""
global PAGINATION_DATA
try:
if os.path.exists(PAGINATION_FILE):
with open(PAGINATION_FILE, 'r') as f:
PAGINATION_DATA = json.load(f)
except Exception:
PAGINATION_DATA = {}
def save_pagination_data():
"""Save pagination data to JSON file (excluding non-serializable embeds)"""
try:
# Create serializable copy without embeds
serializable_data = {}
for key, value in PAGINATION_DATA.items():
serializable_data[key] = {
"pages": value["pages"],
"song": value["song"],
"artist": value["artist"],
"total_pages": value["total_pages"]
}
with open(PAGINATION_FILE, 'w') as f:
json.dump(serializable_data, f)
except Exception:
pass
import logging
from typing import Optional, Union
from regex import Pattern
import discord
import regex
from util.sing_util import Utility
from discord.ext import bridge, commands
from disc_havoc import Havoc
import random
import string
BOT_CHANIDS = []
# Global storage for pagination data (survives across interactions)
PAGINATION_DATA: dict[str, dict] = {}
class LyricsPaginator(discord.ui.View):
def __init__(self, pages, song, artist):
super().__init__(timeout=None) # No timeout - buttons work permanently
self.pages = pages
self.song = song
self.artist = artist
self.current_page = 0
# Generate a unique ID for this paginator instance
self.custom_id_base = ''.join(random.choices(string.ascii_letters + string.digits, k=16))
# Pre-generate all embeds to avoid creating them on each page turn
self.embeds = []
# URL encode artist and song for the link
import urllib.parse
encoded_artist = urllib.parse.quote(self.artist, safe='')
encoded_song = urllib.parse.quote(self.song, safe='')
lyrics_url = f"https://codey.lol/#{encoded_artist}/{encoded_song}"
for i, page in enumerate(pages):
embed = discord.Embed(title=f"{self.song}", color=discord.Color.blue(), url=lyrics_url)
embed.set_author(name=self.artist)
embed.description = page
# Set footer with just page info for multi-page, or no footer for single page
if len(pages) > 1:
embed.set_footer(text=f"Page {i + 1} of {len(pages)}", icon_url="https://codey.lol/favicon.ico")
self.embeds.append(embed)
# Store pagination data for persistence across bot restarts
if len(pages) > 1:
# Store data (embeds will be recreated as needed since they can't be JSON serialized)
serializable_data = {
"pages": pages,
"song": song,
"artist": artist,
"total_pages": len(pages)
}
# Store in memory with embeds for performance
PAGINATION_DATA[self.custom_id_base] = {
**serializable_data,
"embeds": self.embeds
}
# Save serializable data to file
save_pagination_data()
def get_view_for_page(self, page_num: int):
"""Create a view with properly configured buttons for the given page"""
view = discord.ui.View(timeout=None)
if len(self.pages) > 1:
# Previous button
prev_button = discord.ui.Button(
label="Previous",
style=discord.ButtonStyle.secondary,
emoji="⬅️",
disabled=(page_num == 0),
custom_id=f"lyrics_prev:{self.custom_id_base}",
row=0
)
# Next button
next_button = discord.ui.Button(
label="Next",
style=discord.ButtonStyle.primary,
emoji="➡️",
disabled=(page_num == len(self.pages) - 1),
custom_id=f"lyrics_next:{self.custom_id_base}",
row=0
)
view.add_item(prev_button)
view.add_item(next_button)
return view
class Sing(commands.Cog): class Sing(commands.Cog):
"""Sing Cog for Havoc""" """Sing Cog for Havoc"""
@@ -23,6 +151,84 @@ class Sing(commands.Cog):
r"\x0f|\x1f|\035|\002|\u2064|\x02|(\x03([0-9]{1,2}))|(\x03|\003)(?:\d{1,2}(?:,\d{1,2})?)?", r"\x0f|\x1f|\035|\002|\u2064|\x02|(\x03([0-9]{1,2}))|(\x03|\003)(?:\d{1,2}(?:,\d{1,2})?)?",
regex.UNICODE, regex.UNICODE,
) )
# Persistent interactions handled by on_interaction listener
@commands.Cog.listener()
async def on_interaction(self, interaction: discord.Interaction):
"""Handle persistent lyrics pagination interactions"""
if interaction.type != discord.InteractionType.component:
return
custom_id = interaction.data.get("custom_id", "")
if not (custom_id.startswith("lyrics_prev:") or custom_id.startswith("lyrics_next:")):
return
try:
# Extract the base custom ID and direction
if custom_id.startswith("lyrics_prev:"):
direction = -1
base_id = custom_id.replace("lyrics_prev:", "")
else: # lyrics_next:
direction = 1
base_id = custom_id.replace("lyrics_next:", "")
# Get pagination data from JSON persistence
data = PAGINATION_DATA.get(base_id)
if not data:
await interaction.response.send_message(
"⚠️ **Navigation Expired**\n"
"These buttons are from a previous session. "
"Use `/s <song>` or `/sing <artist> : <song>` to get fresh lyrics!",
ephemeral=True,
)
return
# Recreate embeds if they don't exist (loaded from JSON)
if "embeds" not in data or not data["embeds"]:
paginator = LyricsPaginator(data["pages"], data["song"], data["artist"])
paginator.custom_id_base = base_id
data["embeds"] = paginator.embeds
# Update in-memory data
PAGINATION_DATA[base_id] = data
# Get current page from embed footer
if not interaction.message or not interaction.message.embeds:
await interaction.response.defer()
return
current_embed = interaction.message.embeds[0]
footer_text = current_embed.footer.text if current_embed.footer else ""
# Extract current page number (format: "Page X of Y")
current_page = 0
if "Page " in footer_text and " of " in footer_text:
try:
page_part = footer_text.split("Page ")[1].split(" of ")[0]
current_page = int(page_part) - 1
except:
pass
# Calculate new page
new_page = current_page + direction
total_pages = len(data["pages"])
if new_page < 0 or new_page >= total_pages:
await interaction.response.defer() # Just acknowledge, don't change anything
return
# Create new paginator and get view for new page
paginator = LyricsPaginator(data["pages"], data["song"], data["artist"])
paginator.custom_id_base = base_id # Preserve same custom ID
view = paginator.get_view_for_page(new_page)
# Update the message
await interaction.response.edit_message(embed=data["embeds"][new_page], view=view)
except Exception as e:
try:
await interaction.response.send_message(f"Error handling pagination: {str(e)}", ephemeral=True)
except:
pass
def is_spamchan(): # type: ignore def is_spamchan(): # type: ignore
"""Check if channel is spam chan""" """Check if channel is spam chan"""
@@ -60,18 +266,18 @@ class Sing(commands.Cog):
"**Error**: No song specified, no activity found to read." "**Error**: No song specified, no activity found to read."
) )
await ctx.respond( # Initial response that we'll edit later
"*Searching...*" message = await ctx.respond("*Searching for lyrics...*")
) # Must respond to interactions within 3 seconds, per Discord
parsed = self.utility.parse_song_input(song, activity) parsed = self.utility.parse_song_input(song, activity)
if isinstance(parsed, tuple): if isinstance(parsed, tuple):
(search_artist, search_song, search_subsearch) = parsed (search_artist, search_song, search_subsearch) = parsed
# Update the message to show what we're searching for
await message.edit(content=f"*Searching for '{search_song}' by {search_artist}...*")
# await ctx.respond(f"So, {search_song} by {search_artist}? Subsearch: {search_subsearch} I will try...") # Commented, useful for debugging
search_result: Optional[list] = await self.utility.lyric_search( search_result: Optional[list] = await self.utility.lyric_search(
search_artist, search_song, search_subsearch search_artist, search_song, search_subsearch, is_spam_channel=(ctx.channel.id in BOT_CHANIDS)
) )
if not search_result: if not search_result:
@@ -97,29 +303,26 @@ class Sing(commands.Cog):
search_result_wrapped_short: list[str] = search_result[ search_result_wrapped_short: list[str] = search_result[
2 2
] # Third is short wrapped lyrics ] # Third is short wrapped lyrics
if ctx.channel.id not in BOT_CHANIDS: # Now all channels get full pagination - removed the early return restriction
short_lyrics = " ".join(
search_result_wrapped_short
) # Replace with shortened lyrics for non spamchans
short_lyrics = regex.sub(
r"\p{Vert_Space}", " / ", short_lyrics.strip()
)
return await ctx.respond(
f"**{search_result_song}** by **{search_result_artist}**\n-# {short_lyrics}"
)
out_messages: list = [] # Use the appropriate wrapped lyrics based on channel type
footer: str = "" # Placeholder is_spam_channel = ctx.channel.id in BOT_CHANIDS
for c, section in enumerate(search_result_wrapped): if is_spam_channel:
if c == len(search_result_wrapped): # Spam channels get full-length lyrics
footer = f"`Found on: {search_result_src}`" pages = search_result_wrapped.copy()
section = regex.sub(r"\p{Vert_Space}", " / ", section.strip()) else:
msg: str = f"**{search_result_song}** by **{search_result_artist}**\n-# {section}\n{footer}" # Non-spam channels get shorter lyrics for better UX
if c > 1: pages = search_result_wrapped.copy() # For now, still use full but we'll modify limits later
msg = "\n".join(msg.split("\n")[1:])
out_messages.append(msg.strip()) # Add source info to last page
for msg in out_messages: if pages:
await ctx.send(msg) pages[-1] += f"\n\n`Found on: {search_result_src}`"
# Create and send view with paginator
paginator = LyricsPaginator(pages, search_result_song, search_result_artist)
view = paginator.get_view_for_page(0)
# Edit the existing message to show the lyrics
await message.edit(content=None, embed=paginator.embeds[0], view=view)
except Exception as e: except Exception as e:
traceback.print_exc() traceback.print_exc()
await ctx.respond(f"ERR: {str(e)}") await ctx.respond(f"ERR: {str(e)}")
@@ -149,26 +352,29 @@ class Sing(commands.Cog):
) )
member_id: int = member.id # if not(member.id == PODY_ID) else 1234134345497837679 # Use Thomas for Pody! member_id: int = member.id # if not(member.id == PODY_ID) else 1234134345497837679 # Use Thomas for Pody!
activity: Optional[discord.Activity] = None activity: Optional[discord.Activity] = None
if IS_SPAMCHAN: # Initial response that we'll edit later
await ctx.respond(f"***Reading activity of {member_display}...***") message = await ctx.respond(f"*Reading activity of {member_display}...*" if IS_SPAMCHAN else "*Processing...*")
for _activity in ctx.interaction.guild.get_member(member_id).activities: for _activity in ctx.interaction.guild.get_member(member_id).activities:
if _activity.type == discord.ActivityType.listening: if _activity.type == discord.ActivityType.listening:
activity = _activity activity = _activity
parsed: Union[tuple, bool] = self.utility.parse_song_input( parsed: Union[tuple, bool] = self.utility.parse_song_input(
song=None, activity=activity song=None, activity=activity
) )
if not parsed: if not parsed:
return await ctx.respond( if IS_SPAMCHAN:
f"Could not parse activity of {member_display}.", ephemeral=True return await message.edit(content=f"Could not parse activity of {member_display}.")
) else:
return await ctx.respond(
f"Could not parse activity of {member_display}.", ephemeral=True
)
if isinstance(parsed, tuple): if isinstance(parsed, tuple):
(search_artist, search_song, search_subsearch) = parsed (search_artist, search_song, search_subsearch) = parsed
await ctx.respond( await message.edit(content=f"*Searching for '{search_song}' by {search_artist}...*")
"*Searching...*"
) # Must respond to interactions within 3 seconds, per Discord
search_result: Optional[list] = await self.utility.lyric_search( search_result: Optional[list] = await self.utility.lyric_search(
search_artist, search_song, search_subsearch search_artist, search_song, search_subsearch, is_spam_channel=(ctx.channel.id in BOT_CHANIDS)
) )
if not search_result: if not search_result:
await ctx.respond("ERR: No search result") await ctx.respond("ERR: No search result")
@@ -193,33 +399,24 @@ class Sing(commands.Cog):
2 2
] # Third index is shortened lyrics ] # Third index is shortened lyrics
if not IS_SPAMCHAN: # All channels now get full paginated response
short_lyrics = " ".join(
search_result_wrapped_short
) # Replace with shortened lyrics for non spamchans
short_lyrics = regex.sub(
r"\p{Vert_Space}", " / ", short_lyrics.strip()
)
return await ctx.respond(
f"**{search_result_song}** by **{search_result_artist}**\n-# {short_lyrics}"
)
out_messages: list = [] # Create paginator for lyrics
footer: str = "" if not search_result_wrapped:
c: int = 0 return await ctx.respond("No lyrics found.")
for section in search_result_wrapped:
c += 1 # Use the already smartly-wrapped pages from sing_util
if c == len(search_result_wrapped): pages = search_result_wrapped.copy() # Already processed by _smart_lyrics_wrap
footer = f"`Found on: {search_result_src}`"
# if ctx.guild.id == 1145182936002482196: # Add source info to last page
# section = section.upper() if pages:
section = regex.sub(r"\p{Vert_Space}", " / ", section.strip()) pages[-1] += f"\n\n`Found on: {search_result_src}`"
msg: str = f"**{search_result_song}** by **{search_result_artist}**\n-# {section}\n{footer}"
if c > 1: # Create and send view with paginator
msg = "\n".join(msg.split("\n")[1:]) paginator = LyricsPaginator(pages, search_result_song, search_result_artist)
out_messages.append(msg.strip()) view = paginator.get_view_for_page(0)
for msg in out_messages: # Edit the existing message to show the lyrics
await ctx.send(msg) await message.edit(content=None, embed=paginator.embeds[0], view=view)
except Exception as e: except Exception as e:
traceback.print_exc() traceback.print_exc()
return await ctx.respond(f"ERR: {str(e)}") return await ctx.respond(f"ERR: {str(e)}")
@@ -227,4 +424,5 @@ class Sing(commands.Cog):
def setup(bot) -> None: def setup(bot) -> None:
"""Run on Cog Load""" """Run on Cog Load"""
load_pagination_data() # Load existing pagination data from file
bot.add_cog(Sing(bot)) bot.add_cog(Sing(bot))

View File

@@ -25,10 +25,11 @@ cogs_list: list[str] = [
"sing", "sing",
"meme", "meme",
"lovehate", "lovehate",
"radio", # "ollama",
# "radio",
] ]
bot_activity = discord.CustomActivity(name="I made cookies!") bot_activity = discord.CustomActivity(name="I LIKE TURTLES")
load_dotenv() load_dotenv()
@@ -42,7 +43,7 @@ class Havoc(bridge.Bot):
command_prefix=".", command_prefix=".",
intents=intents, intents=intents,
owner_ids=OWNERS, owner_ids=OWNERS,
activity=bot_activity, activity=None,
help_command=commands.MinimalHelpCommand(), help_command=commands.MinimalHelpCommand(),
) )
self.BOT_CHANIDS = BOT_CHANIDS self.BOT_CHANIDS = BOT_CHANIDS
@@ -83,6 +84,7 @@ class Havoc(bridge.Bot):
@commands.Cog.listener() @commands.Cog.listener()
async def on_ready(self) -> None: async def on_ready(self) -> None:
"""Run on Bot Ready""" """Run on Bot Ready"""
await self.change_presence(activity=None)
logging.info("%s online!", self.user) logging.info("%s online!", self.user)

View File

@@ -25,7 +25,11 @@ async def get_now_playing() -> Optional[str]:
) as request: ) as request:
request.raise_for_status() request.raise_for_status()
response_json = await request.json() response_json = await request.json()
artistsong = response_json.get("artistsong") artistsong: str = "N/A - N/A"
artist: Optional[str] = response_json.get("artist")
song: Optional[str] = response_json.get("song")
if artist and song:
artistsong = f"{artist} - {song}"
return artistsong return artistsong
except Exception as e: except Exception as e:
logging.critical("Now playing retrieval failed: %s", str(e)) logging.critical("Now playing retrieval failed: %s", str(e))

View File

@@ -1,10 +1,12 @@
import logging import logging
import regex
import aiohttp
import textwrap
import traceback import traceback
import aiohttp
import regex
import discord
from discord import Activity from discord import Activity
from typing import Optional, Union
logger = logging.getLogger(__name__)
class Utility: class Utility:
@@ -14,9 +16,134 @@ class Utility:
self.api_url: str = "http://127.0.0.1:52111/lyric/search" self.api_url: str = "http://127.0.0.1:52111/lyric/search"
self.api_src: str = "DISC-HAVOC" self.api_src: str = "DISC-HAVOC"
def _smart_lyrics_wrap(self, lyrics: str, max_length: int = 1500, single_page: bool = False, max_verses: int = 100, max_lines: int = 150) -> list[str]:
"""
Intelligently wrap lyrics to avoid breaking verses in the middle
Prioritizes keeping verses intact over page length consistency
Args:
lyrics: Raw lyrics text
max_length: Maximum character length per page (soft limit)
single_page: If True, return only the first page
Returns:
List of lyrics pages
"""
if not lyrics:
return []
# Strip markdown formatting from lyrics
lyrics = discord.utils.escape_markdown(lyrics)
verses = []
current_verse: list[str] = []
# Handle both regular newlines and zero-width space newlines
lines = lyrics.replace("\u200b\n", "\n").split("\n")
empty_line_count = 0
for line in lines:
stripped_line = line.strip()
if not stripped_line or stripped_line in ["", "\u200b"]:
empty_line_count += 1
# One empty line indicates a section break (be more aggressive)
if empty_line_count >= 1 and current_verse:
verses.append("\n".join(current_verse))
current_verse = []
empty_line_count = 0
else:
empty_line_count = 0
current_verse.append(stripped_line)
# Add the last verse if it exists
if current_verse:
verses.append("\n".join(current_verse))
# If we have too few verses (verse detection failed), fallback to line-based splitting
if len(verses) <= 1:
all_lines = lyrics.split("\n")
verses = []
current_chunk = []
for line in all_lines:
current_chunk.append(line.strip())
# Split every 8-10 lines to create artificial "verses"
if len(current_chunk) >= 8:
verses.append("\n".join(current_chunk))
current_chunk = []
# Add remaining lines
if current_chunk:
verses.append("\n".join(current_chunk))
if not verses:
return [lyrics[:max_length]]
# If single page requested, return first verse or truncated
if single_page:
result = verses[0]
if len(result) > max_length:
# Try to fit at least part of the first verse
lines_in_verse = result.split("\n")
truncated_lines = []
current_length = 0
for line in lines_in_verse:
if current_length + len(line) + 1 <= max_length - 3: # -3 for "..."
truncated_lines.append(line)
current_length += len(line) + 1
else:
break
result = "\n".join(truncated_lines) + "..." if truncated_lines else verses[0][:max_length-3] + "..."
return [result]
# Group complete verses into pages, never breaking a verse
# Limit by character count, verse count, AND line count for visual appeal
max_verses_per_page = max_verses
max_lines_per_page = max_lines
pages = []
current_page_verses: list[str] = []
current_page_length = 0
current_page_lines = 0
for verse in verses:
verse_length = len(verse)
# Count lines properly - handle both regular newlines and zero-width space newlines
verse_line_count = verse.count("\n") + verse.count("\u200b\n") + 1
# Calculate totals if we add this verse (including separator)
separator_length = 3 if current_page_verses else 0 # "\n\n\n" between verses
separator_lines = 3 if current_page_verses else 0 # 3 empty lines between verses
total_length_with_verse = current_page_length + separator_length + verse_length
total_lines_with_verse = current_page_lines + separator_lines + verse_line_count
# Check all three limits: character, verse count, and line count
exceeds_length = total_length_with_verse > max_length
exceeds_verse_count = len(current_page_verses) >= max_verses_per_page
exceeds_line_count = total_lines_with_verse > max_lines_per_page
# If adding this verse would exceed any limit AND we already have verses on the page
if (exceeds_length or exceeds_verse_count or exceeds_line_count) and current_page_verses:
# Finish current page with existing verses
pages.append("\n\n".join(current_page_verses))
current_page_verses = [verse]
current_page_length = verse_length
current_page_lines = verse_line_count
else:
# Add verse to current page
current_page_verses.append(verse)
current_page_length = total_length_with_verse
current_page_lines = total_lines_with_verse
# Add the last page if it has content
if current_page_verses:
pages.append("\n\n".join(current_page_verses))
return pages if pages else [lyrics[:max_length]]
def parse_song_input( def parse_song_input(
self, song: Optional[str] = None, activity: Optional[Activity] = None self, song: str | None = None, activity: Activity | None = None,
) -> Union[bool, tuple]: ) -> bool | tuple:
""" """
Parse Song (Sing Command) Input Parse Song (Sing Command) Input
@@ -36,7 +163,7 @@ class Utility:
match activity.name.lower(): match activity.name.lower():
case "codey toons" | "cider" | "sonixd": case "codey toons" | "cider" | "sonixd":
search_artist: str = " ".join( search_artist: str = " ".join(
str(activity.state).strip().split(" ")[1:] str(activity.state).strip().split(" ")[1:],
) )
search_artist = regex.sub( search_artist = regex.sub(
r"(\s{0,})(\[(spotify|tidal|sonixd|browser|yt music)])$", r"(\s{0,})(\[(spotify|tidal|sonixd|browser|yt music)])$",
@@ -51,13 +178,13 @@ class Utility:
search_song = str(activity.details) search_song = str(activity.details)
song = f"{search_artist} : {search_song}" song = f"{search_artist} : {search_song}"
case "spotify": case "spotify":
if not activity.title or not activity.artist: # type: ignore if not activity.title or not activity.artist: # type: ignore[attr-defined]
""" """
Attributes exist, but mypy does not recognize them. Ignored. Attributes exist, but mypy does not recognize them. Ignored.
""" """
return False return False
search_artist = str(activity.artist) # type: ignore search_artist = str(activity.artist) # type: ignore[attr-defined]
search_song = str(activity.title) # type: ignore search_song = str(activity.title) # type: ignore[attr-defined]
song = f"{search_artist} : {search_song}" song = f"{search_artist} : {search_song}"
case "serious.fm" | "cocks.fm" | "something": case "serious.fm" | "cocks.fm" | "something":
if not activity.details: if not activity.details:
@@ -78,27 +205,27 @@ class Utility:
return False return False
search_artist = song.split(search_split_by)[0].strip() search_artist = song.split(search_split_by)[0].strip()
search_song = "".join(song.split(search_split_by)[1:]).strip() search_song = "".join(song.split(search_split_by)[1:]).strip()
search_subsearch: Optional[str] = None search_subsearch: str | None = None
if ( if (
search_split_by == ":" and len(song.split(":")) > 2 search_split_by == ":" and len(song.split(":")) > 2
): # Support sub-search if : is used (per instructions) ): # Support sub-search if : is used (per instructions)
search_song = song.split( search_song = song.split(
search_split_by search_split_by,
)[ )[
1 1
].strip() # Reduce search_song to only the 2nd split of : [the rest is meant to be lyric text] ].strip() # Reduce search_song to only the 2nd split of : [the rest is meant to be lyric text]
search_subsearch = "".join( search_subsearch = "".join(
song.split(search_split_by)[2:] song.split(search_split_by)[2:],
) # Lyric text from split index 2 and beyond ) # Lyric text from split index 2 and beyond
return (search_artist, search_song, search_subsearch) return (search_artist, search_song, search_subsearch)
except Exception as e: except Exception as e:
logging.debug("Exception: %s", str(e)) logger.debug("Exception: %s", str(e))
traceback.print_exc() traceback.print_exc()
return False return False
async def lyric_search( async def lyric_search(
self, artist: str, song: str, sub: Optional[str] = None self, artist: str, song: str, sub: str | None = None, is_spam_channel: bool = True,
) -> Optional[list]: ) -> list | None:
""" """
Lyric Search Lyric Search
@@ -140,7 +267,7 @@ class Utility:
return [(f"ERR: {response.get('errorText')}",)] return [(f"ERR: {response.get('errorText')}",)]
out_lyrics = regex.sub( out_lyrics = regex.sub(
r"<br>", "\u200b\n", response.get("lyrics", "") r"<br>", "\u200b\n", response.get("lyrics", ""),
) )
response_obj: dict = { response_obj: dict = {
"artist": response.get("artist"), "artist": response.get("artist"),
@@ -154,24 +281,15 @@ class Utility:
lyrics = response_obj.get("lyrics") lyrics = response_obj.get("lyrics")
if not lyrics: if not lyrics:
return None return None
response_obj["lyrics"] = textwrap.wrap( # Use different limits based on channel type
text=lyrics.strip(), if is_spam_channel:
width=1500, # Spam channels: higher limits for more content per page
drop_whitespace=False, response_obj["lyrics"] = self._smart_lyrics_wrap(lyrics.strip(), max_length=8000, max_verses=100, max_lines=150)
replace_whitespace=False, else:
break_long_words=True, # Non-spam channels: much shorter limits for better UX in regular channels
break_on_hyphens=True, response_obj["lyrics"] = self._smart_lyrics_wrap(lyrics.strip(), max_length=2000, max_verses=15, max_lines=25)
max_lines=8,
) response_obj["lyrics_short"] = self._smart_lyrics_wrap(lyrics.strip(), max_length=500, single_page=True)
response_obj["lyrics_short"] = textwrap.wrap(
text=lyrics.strip(),
width=750,
drop_whitespace=False,
replace_whitespace=False,
break_long_words=True,
break_on_hyphens=True,
max_lines=1,
)
return [ return [
( (
@@ -186,4 +304,4 @@ class Utility:
] ]
except Exception as e: except Exception as e:
traceback.print_exc() traceback.print_exc()
return [f"Retrieval failed: {str(e)}"] return [f"Retrieval failed: {e!s}"]

1508
uv.lock generated Normal file

File diff suppressed because it is too large Load Diff