Compare commits

...

2 Commits

Author SHA1 Message Date
5d881369e9 . 2025-10-08 15:45:47 -04:00
190eb8acd2 feat: refactor Radio cog and update dependencies
feat: update lyric commands to utilize discord's Components v2
2025-10-08 15:45:38 -04:00
7 changed files with 2377 additions and 161 deletions

377
cogs/quote.py Normal file
View File

@@ -0,0 +1,377 @@
#!/usr/bin/env python3.12
# pylint: disable=bare-except, broad-exception-caught
"""
Quote cog for Havoc
"""
import traceback
import time
import os
import datetime
from typing import Optional
import asyncio
import discord
import aiosqlite as sqlite3
from discord.ext import bridge, commands
from disc_havoc import Havoc
class DB:
"""DB Utility for Quote Cog"""
def __init__(self, bot: Havoc):
self.bot: Havoc = bot
self.db_path = os.path.join(
"/", "usr", "local", "share", "sqlite_dbs", "quotes.db"
)
self.hp_chanid = 1157529874936909934
async def get_quote_count(self):
"""Get Quote Count"""
async with sqlite3.connect(self.db_path, timeout=2) as db_conn:
async with await db_conn.execute(
"SELECT COUNT (*) FROM quotes"
) as db_cursor:
result = await db_cursor.fetchone()
return result[-1]
async def remove_quote(self, quote_id: int):
"""Remove Quote from DB"""
try:
async with sqlite3.connect(self.db_path, timeout=2) as db_conn:
async with await db_conn.execute(
"DELETE FROM quotes WHERE id = ?", (quote_id,)
) as _:
await db_conn.commit()
return True
except Exception as e: # noqa
_channel = self.bot.get_channel(self.hp_chanid)
if isinstance(_channel, discord.TextChannel):
await _channel.send(traceback.format_exc())
return False
async def add_quote(
self,
message_id: int,
channel_id: int,
quoted_member_id: int,
message_time: int,
quoter_friendly: str,
quoted_friendly: str,
channel_friendly: str,
message_content: str,
):
"""Add Quote to DB"""
params = (
quoter_friendly,
int(time.time()),
quoted_friendly,
quoted_member_id,
channel_friendly,
channel_id,
message_id,
message_time,
quoter_friendly,
message_content,
)
try:
async with sqlite3.connect(self.db_path, timeout=2) as db_conn:
# pylint: disable=line-too-long
db_conn.row_factory = sqlite3.Row
async with await db_conn.execute(
"INSERT INTO quotes (added_by, added_at, quoted_user_display, quoted_user_memberid, quoted_channel_display, quoted_channel_id, quoted_message_id, quoted_message_time, added_by_friendly, quoted_message) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
params,
) as _: # pylint: enable=line-too-long
await db_conn.commit()
return True
except Exception as e: # noqa
return traceback.format_exc()
async def fetch_quote(
self,
random: bool = False,
quoteid: Optional[int] = None,
added_by: Optional[str] = None,
quoted_user: Optional[str] = None,
content: Optional[str] = None,
):
"""Fetch Quote from DB"""
try:
query_head = "SELECT id, added_by_friendly, added_at, quoted_user_display, quoted_channel_display, quoted_message_time, quoted_message FROM quotes"
query = ""
params: Optional[tuple] = None
if random:
query = f"{query_head} ORDER BY RANDOM() LIMIT 1"
elif quoteid:
query = f"{query_head} WHERE id = ? LIMIT 1"
params = (quoteid,)
elif added_by:
query = f"{query_head} WHERE added_by_friendly LIKE ? ORDER BY RANDOM() LIMIT 5"
params = (f"%{added_by}%",)
elif quoted_user:
query = f"{query_head} WHERE quoted_user_display LIKE ? ORDER BY RANDOM() LIMIT 5"
params = (f"%{quoted_user}%",)
elif content:
query = f"{query_head} WHERE quoted_message LIKE ? ORDER BY RANDOM() LIMIT 5"
params = (f"%{content}%",)
async with sqlite3.connect(self.db_path, timeout=2) as db_conn:
db_conn.row_factory = sqlite3.Row
async with await db_conn.execute(query, params) as db_cursor:
results = await db_cursor.fetchall()
if not results:
return {
"err": "No results for query",
}
if random or quoteid:
chosen = results[-1]
return {str(k): v for k, v in chosen.items()}
else:
return [{str(k): v for k, v in _.items()} for _ in results]
except Exception as e: # noqa
return traceback.format_exc()
class Quote(commands.Cog):
"""Quote Cog for Havoc"""
def __init__(self, bot: Havoc):
self.bot: Havoc = bot
self.db = DB(self.bot)
def is_homeserver(): # type: ignore
"""Check if channel/interaction is within homeserver"""
def predicate(ctx):
try:
return ctx.guild.id == 1145182936002482196
except Exception as e: # noqa
traceback.print_exc()
return False
return commands.check(predicate)
@commands.message_command(name="Add Quote")
async def add_quote(self, ctx, message: discord.Message):
"""Add A Quote"""
hp_chanid = 1157529874936909934
try:
if message.author.bot:
return await ctx.respond(
"Quotes are for real users, not bots.", ephemeral=True
)
quoter_friendly = ctx.author.display_name
quoted_message_id = message.id
quoted_channel_friendly = f"#{ctx.channel.name}"
quoted_channel_id = ctx.channel.id
message_content = message.content
message_author_friendly = message.author.display_name
message_author_id = message.author.id
message_time = int(message.created_at.timestamp())
message_escaped = discord.utils.escape_mentions(
discord.utils.escape_markdown(message_content)
).strip()
if len(message_escaped) < 3:
return await ctx.respond(
"**Error**: Message (text content) is not long enough to quote.",
ephemeral=True,
)
if len(message_escaped) > 512:
return await ctx.respond(
"**Error**: Message (text content) is too long to quote.",
ephemeral=True,
)
result = await self.db.add_quote(
message_id=quoted_message_id,
channel_id=quoted_channel_id,
quoted_member_id=message_author_id,
message_time=message_time,
quoter_friendly=quoter_friendly,
quoted_friendly=message_author_friendly,
channel_friendly=quoted_channel_friendly,
message_content=message_content,
)
if not result:
return await ctx.respond("Failed!", ephemeral=True)
else:
return await ctx.respond("OK!", ephemeral=True)
except Exception as e: # noqa
_channel = self.bot.get_channel(hp_chanid)
if not isinstance(_channel, discord.TextChannel):
return
await _channel.send(traceback.format_exc())
@bridge.bridge_command(aliases=["rand"])
@is_homeserver() # pylint: disable=too-many-function-args
async def randquote(self, ctx):
"""Get a random quote"""
try:
random_quote = await self.db.fetch_quote(random=True)
if random_quote.get("err"):
return await ctx.respond("Failed to get a quote")
quote_id = random_quote.get("id")
quoted_friendly = random_quote.get("quoted_user_display", "Unknown")
adder_friendly = random_quote.get("added_by_friendly", "Unknown")
message_time = datetime.datetime.fromtimestamp(
random_quote.get("quoted_message_time")
)
message_channel = random_quote.get("quoted_channel_display")
quote_added_at = datetime.datetime.fromtimestamp(
random_quote.get("added_at")
)
quote_content = random_quote.get("quoted_message")
embed = discord.Embed(
colour=discord.Colour.orange(),
title=f"Quote #{quote_id}",
)
embed.description = f"**{quoted_friendly}:** {quote_content}"
embed.add_field(name="Original Message Time", value=message_time)
embed.add_field(name="Channel", value=message_channel)
embed.add_field(name="Quote ID", value=quote_id)
embed.footer = discord.EmbedFooter(
text=f"Added by {adder_friendly} {quote_added_at}"
)
return await ctx.respond(embed=embed)
except Exception as e: # noqa
error = await ctx.respond(traceback.format_exc())
await asyncio.sleep(10)
await error.delete()
@bridge.bridge_command(aliases=["qg"])
@is_homeserver() # pylint: disable=too-many-function-args
async def quoteget(self, ctx, quoteid):
"""Get a specific quote by ID"""
try:
if not str(quoteid).strip().isnumeric():
return await ctx.respond("**Error**: Quote ID must be numeric.")
fetched_quote = await self.db.fetch_quote(quoteid=quoteid)
if fetched_quote.get("err"):
return await ctx.respond("**Error**: Quote not found")
quote_id = fetched_quote.get("id")
quoted_friendly = fetched_quote.get("quoted_user_display", "Unknown")
adder_friendly = fetched_quote.get("added_by_friendly", "Unknown")
message_time = datetime.datetime.fromtimestamp(
fetched_quote.get("quoted_message_time")
)
message_channel = fetched_quote.get("quoted_channel_display")
quote_added_at = datetime.datetime.fromtimestamp(
fetched_quote.get("added_at")
)
quote_content = fetched_quote.get("quoted_message")
embed = discord.Embed(
colour=discord.Colour.orange(),
title=f"Quote #{quote_id}",
)
embed.description = f"**{quoted_friendly}:** {quote_content}"
embed.add_field(name="Original Message Time", value=message_time)
embed.add_field(name="Channel", value=message_channel)
embed.add_field(name="Quote ID", value=quote_id)
embed.footer = discord.EmbedFooter(
text=f"Added by {adder_friendly} {quote_added_at}"
)
return await ctx.respond(embed=embed)
except Exception as e: # noqa
error = await ctx.respond(traceback.format_exc())
await asyncio.sleep(10)
await error.delete()
@bridge.bridge_command(aliases=["qs"])
@is_homeserver() # pylint: disable=too-many-function-args
async def quotesearch(self, ctx, *, content: str):
"""Search for a quote (by content)"""
try:
found_quotes = await self.db.fetch_quote(content=content)
if isinstance(found_quotes, dict) and found_quotes.get("err"):
return await ctx.respond(
f"Quote search failed: {found_quotes.get('err')}"
)
embeds = []
for quote in found_quotes:
quote_id = quote.get("id")
quoted_friendly = quote.get("quoted_user_display", "Unknown")
adder_friendly = quote.get("added_by_friendly", "Unknown")
message_time = datetime.datetime.fromtimestamp(
quote.get("quoted_message_time")
)
message_channel = quote.get("quoted_channel_display")
quote_added_at = datetime.datetime.fromtimestamp(quote.get("added_at"))
quote_content = quote.get("quoted_message")
# await ctx.respond(f"**{quoted_friendly}**: {quote}")ed_friendly = quote.get('quoted_user_display', 'Unknown')
adder_friendly = quote.get("added_by_friendly", "Unknown")
message_time = datetime.datetime.fromtimestamp(
quote.get("quoted_message_time")
)
message_channel = quote.get("quoted_channel_display")
quote_added_at = datetime.datetime.fromtimestamp(quote.get("added_at"))
quote = quote.get("quoted_message")
# await ctx.respond(f"**{quoted_friendly}**: {quote}")
embed = discord.Embed(
colour=discord.Colour.orange(),
title=f"Quote #{quote_id}",
)
embed.description = f"**{quoted_friendly}:** {quote_content}"
embed.add_field(name="Original Message Time", value=str(message_time))
embed.add_field(name="Channel", value=message_channel)
embed.add_field(name="Quote ID", value=quote_id)
embed.footer = discord.EmbedFooter(
text=f"Added by {adder_friendly} {quote_added_at}"
)
embeds.append(embed)
return await ctx.respond(embeds=embeds)
except Exception as e:
await ctx.respond(f"Error: {type(e).__name__} - {str(e)}")
@bridge.bridge_command(aliases=["nq"])
@is_homeserver() # pylint: disable=too-many-function-args
async def nquotes(self, ctx):
"""Get # of quotes stored"""
try:
quote_count = await self.db.get_quote_count()
if not quote_count:
return await ctx.respond("**Error**: No quotes found!")
return await ctx.respond(
f"I currently have **{quote_count}** quotes stored."
)
except Exception as e:
await ctx.respond(f"Error: {type(e).__name__} - {str(e)}")
@bridge.bridge_command(aliases=["qr"])
@commands.is_owner()
@is_homeserver() # pylint: disable=too-many-function-args
async def quoteremove(self, ctx, quoteid):
"""Remove a quote (by id)
Owner only"""
try:
if not str(quoteid).strip().isnumeric():
return await ctx.respond("**Error**: Quote ID must be numeric.")
quoteid = int(quoteid)
remove_quote = await self.db.remove_quote(quoteid)
if not remove_quote:
return await ctx.respond("**Error**: Failed!", ephemeral=True)
return await ctx.respond("Removed!", ephemeral=True)
except Exception as e:
await ctx.respond(f"Error: {type(e).__name__} - {str(e)}")
def setup(bot):
"""Run on Cog Load"""
bot.add_cog(Quote(bot))

View File

@@ -1,5 +1,6 @@
import logging
import traceback
import asyncio
from typing import Optional
from discord.ext import bridge, commands, tasks
from util.radio_util import get_now_playing, skip
@@ -43,78 +44,86 @@ class Radio(commands.Cog):
return commands.check(predicate)
@bridge.bridge_command()
@commands.is_owner()
async def reinitradio(self, ctx) -> None:
"""
Reinitialize serious.FM
"""
loop: discord.asyncio.AbstractEventLoop = self.bot.loop
loop.create_task(self.radio_init())
await ctx.respond("Done!", ephemeral=True)
# @bridge.bridge_command()
# @commands.is_owner()
# async def reinitradio(self, ctx) -> None:
# """
# Reinitialize serious.FM
# """
# loop: discord.asyncio.AbstractEventLoop = self.bot.loop
# loop.create_task(self.radio_init())
# await ctx.respond("Done!", ephemeral=True)
# async def radio_init(self) -> None:
# """Init Radio"""
# try:
# (radio_guild, radio_chan) = self.channels["sfm"]
# guild: Optional[discord.Guild] = self.bot.get_guild(radio_guild)
# if not guild:
# return
# channel = guild.get_channel(radio_chan)
# if not isinstance(channel, discord.VoiceChannel):
# return
# if not self.bot.voice_clients:
# await channel.connect()
# try:
# try:
# self.radio_state_loop.cancel()
# except Exception as e:
# logging.debug("Failed to cancel radio_state_loop: %s", str(e))
# self.radio_state_loop.start()
# logging.info("radio_state_loop task started!")
# except Exception as e:
# logging.critical("Could not start task... Exception: %s", str(e))
# traceback.print_exc()
# except Exception as e:
# logging.debug("Exception: %s", str(e))
# traceback.print_exc()
# return
async def radio_init(self) -> None:
"""Init Radio"""
try:
(radio_guild, radio_chan) = self.channels["sfm"]
guild: Optional[discord.Guild] = self.bot.get_guild(radio_guild)
if not guild:
return
channel = guild.get_channel(radio_chan)
if not isinstance(channel, discord.VoiceChannel):
return
if not self.bot.voice_clients:
await channel.connect()
try:
try:
self.radio_state_loop.cancel()
except Exception as e:
logging.debug("Failed to cancel radio_state_loop: %s", str(e))
self.radio_state_loop.start()
logging.info("radio_state_loop task started!")
except Exception as e:
logging.critical("Could not start task... Exception: %s", str(e))
traceback.print_exc()
self.radio_state_loop.start()
except Exception as e:
logging.debug("Exception: %s", str(e))
logging.critical("Failed to start radio state loop: %s", str(e))
traceback.print_exc()
return
@tasks.loop(seconds=5.0)
async def radio_state_loop(self) -> None:
"""Radio State Loop"""
try:
(radio_guild, radio_chan) = self.channels["sfm"]
try:
vc: discord.VoiceProtocol = self.bot.voice_clients[-1]
except Exception as e:
logging.debug(
"No voice client, establishing new VC connection... (Exception: %s)",
str(e),
)
guild: Optional[discord.Guild] = self.bot.get_guild(radio_guild)
if not guild:
return
channel = guild.get_channel(radio_chan)
if not isinstance(channel, discord.VoiceChannel):
return
await channel.connect()
vc = self.bot.voice_clients[-1]
# (radio_guild, radio_chan) = self.channels["sfm"]
# try:
# vc: discord.VoiceProtocol = self.bot.voice_clients[-1]
# except Exception as e:
# logging.debug(
# "No voice client, establishing new VC connection... (Exception: %s)",
# str(e),
# )
# guild: Optional[discord.Guild] = self.bot.get_guild(radio_guild)
# if not guild:
# return
# channel = guild.get_channel(radio_chan)
# if not isinstance(channel, discord.VoiceChannel):
# return
# await channel.connect()
# vc = self.bot.voice_clients[-1]
if not vc.is_playing() or vc.is_paused(): # type: ignore
"""
Mypy does not seem aware of the is_playing, play, and is_paused methods,
but they exist.
"""
logging.info("Detected VC not playing... playing!")
source: discord.FFmpegAudio = discord.FFmpegOpusAudio(self.STREAM_URL)
vc.play( # type: ignore
source,
after=lambda e: logging.info("Error: %s", e) if e else None,
)
# if not vc.is_playing() or vc.is_paused(): # type: ignore
# """
# Mypy does not seem aware of the is_playing, play, and is_paused methods,
# but they exist.
# """
# logging.info("Detected VC not playing... playing!")
# source: discord.FFmpegAudio = discord.FFmpegOpusAudio(self.STREAM_URL)
# vc.play( # type: ignore
# source,
# after=lambda e: logging.info("Error: %s", e) if e else None,
# )
# Get Now Playing
np_track: Optional[str] = await get_now_playing()
if np_track and not self.LAST_NP_TRACK == np_track:
if not self.LAST_NP_TRACK or (np_track and not self.LAST_NP_TRACK == np_track):
logging.critical("Setting: %s", np_track)
self.LAST_NP_TRACK = np_track
await self.bot.change_presence(
activity=discord.Activity(

View File

@@ -7,9 +7,137 @@ import regex
from util.sing_util import Utility
from discord.ext import bridge, commands
from disc_havoc import Havoc
import random
import string
import json
import os
BOT_CHANIDS = []
# Global storage for pagination data (survives across interactions)
PAGINATION_DATA = {}
PAGINATION_FILE = "pagination_data.json"
def load_pagination_data():
"""Load pagination data from JSON file"""
global PAGINATION_DATA
try:
if os.path.exists(PAGINATION_FILE):
with open(PAGINATION_FILE, 'r') as f:
PAGINATION_DATA = json.load(f)
except Exception:
PAGINATION_DATA = {}
def save_pagination_data():
"""Save pagination data to JSON file (excluding non-serializable embeds)"""
try:
# Create serializable copy without embeds
serializable_data = {}
for key, value in PAGINATION_DATA.items():
serializable_data[key] = {
"pages": value["pages"],
"song": value["song"],
"artist": value["artist"],
"total_pages": value["total_pages"]
}
with open(PAGINATION_FILE, 'w') as f:
json.dump(serializable_data, f)
except Exception:
pass
import logging
from typing import Optional, Union
from regex import Pattern
import discord
import regex
from util.sing_util import Utility
from discord.ext import bridge, commands
from disc_havoc import Havoc
import random
import string
BOT_CHANIDS = []
# Global storage for pagination data (survives across interactions)
PAGINATION_DATA: dict[str, dict] = {}
class LyricsPaginator(discord.ui.View):
def __init__(self, pages, song, artist):
super().__init__(timeout=None) # No timeout - buttons work permanently
self.pages = pages
self.song = song
self.artist = artist
self.current_page = 0
# Generate a unique ID for this paginator instance
self.custom_id_base = ''.join(random.choices(string.ascii_letters + string.digits, k=16))
# Pre-generate all embeds to avoid creating them on each page turn
self.embeds = []
# URL encode artist and song for the link
import urllib.parse
encoded_artist = urllib.parse.quote(self.artist, safe='')
encoded_song = urllib.parse.quote(self.song, safe='')
lyrics_url = f"https://codey.lol/#{encoded_artist}/{encoded_song}"
for i, page in enumerate(pages):
embed = discord.Embed(title=f"{self.song}", color=discord.Color.blue(), url=lyrics_url)
embed.set_author(name=self.artist)
embed.description = page
# Set footer with just page info for multi-page, or no footer for single page
if len(pages) > 1:
embed.set_footer(text=f"Page {i + 1} of {len(pages)}", icon_url="https://codey.lol/favicon.ico")
self.embeds.append(embed)
# Store pagination data for persistence across bot restarts
if len(pages) > 1:
# Store data (embeds will be recreated as needed since they can't be JSON serialized)
serializable_data = {
"pages": pages,
"song": song,
"artist": artist,
"total_pages": len(pages)
}
# Store in memory with embeds for performance
PAGINATION_DATA[self.custom_id_base] = {
**serializable_data,
"embeds": self.embeds
}
# Save serializable data to file
save_pagination_data()
def get_view_for_page(self, page_num: int):
"""Create a view with properly configured buttons for the given page"""
view = discord.ui.View(timeout=None)
if len(self.pages) > 1:
# Previous button
prev_button = discord.ui.Button(
label="Previous",
style=discord.ButtonStyle.secondary,
emoji="⬅️",
disabled=(page_num == 0),
custom_id=f"lyrics_prev:{self.custom_id_base}",
row=0
)
# Next button
next_button = discord.ui.Button(
label="Next",
style=discord.ButtonStyle.primary,
emoji="➡️",
disabled=(page_num == len(self.pages) - 1),
custom_id=f"lyrics_next:{self.custom_id_base}",
row=0
)
view.add_item(prev_button)
view.add_item(next_button)
return view
class Sing(commands.Cog):
"""Sing Cog for Havoc"""
@@ -23,6 +151,84 @@ class Sing(commands.Cog):
r"\x0f|\x1f|\035|\002|\u2064|\x02|(\x03([0-9]{1,2}))|(\x03|\003)(?:\d{1,2}(?:,\d{1,2})?)?",
regex.UNICODE,
)
# Persistent interactions handled by on_interaction listener
@commands.Cog.listener()
async def on_interaction(self, interaction: discord.Interaction):
"""Handle persistent lyrics pagination interactions"""
if interaction.type != discord.InteractionType.component:
return
custom_id = interaction.data.get("custom_id", "")
if not (custom_id.startswith("lyrics_prev:") or custom_id.startswith("lyrics_next:")):
return
try:
# Extract the base custom ID and direction
if custom_id.startswith("lyrics_prev:"):
direction = -1
base_id = custom_id.replace("lyrics_prev:", "")
else: # lyrics_next:
direction = 1
base_id = custom_id.replace("lyrics_next:", "")
# Get pagination data from JSON persistence
data = PAGINATION_DATA.get(base_id)
if not data:
await interaction.response.send_message(
"⚠️ **Navigation Expired**\n"
"These buttons are from a previous session. "
"Use `/s <song>` or `/sing <artist> : <song>` to get fresh lyrics!",
ephemeral=True,
)
return
# Recreate embeds if they don't exist (loaded from JSON)
if "embeds" not in data or not data["embeds"]:
paginator = LyricsPaginator(data["pages"], data["song"], data["artist"])
paginator.custom_id_base = base_id
data["embeds"] = paginator.embeds
# Update in-memory data
PAGINATION_DATA[base_id] = data
# Get current page from embed footer
if not interaction.message or not interaction.message.embeds:
await interaction.response.defer()
return
current_embed = interaction.message.embeds[0]
footer_text = current_embed.footer.text if current_embed.footer else ""
# Extract current page number (format: "Page X of Y")
current_page = 0
if "Page " in footer_text and " of " in footer_text:
try:
page_part = footer_text.split("Page ")[1].split(" of ")[0]
current_page = int(page_part) - 1
except:
pass
# Calculate new page
new_page = current_page + direction
total_pages = len(data["pages"])
if new_page < 0 or new_page >= total_pages:
await interaction.response.defer() # Just acknowledge, don't change anything
return
# Create new paginator and get view for new page
paginator = LyricsPaginator(data["pages"], data["song"], data["artist"])
paginator.custom_id_base = base_id # Preserve same custom ID
view = paginator.get_view_for_page(new_page)
# Update the message
await interaction.response.edit_message(embed=data["embeds"][new_page], view=view)
except Exception as e:
try:
await interaction.response.send_message(f"Error handling pagination: {str(e)}", ephemeral=True)
except:
pass
def is_spamchan(): # type: ignore
"""Check if channel is spam chan"""
@@ -60,18 +266,18 @@ class Sing(commands.Cog):
"**Error**: No song specified, no activity found to read."
)
await ctx.respond(
"*Searching...*"
) # Must respond to interactions within 3 seconds, per Discord
# Initial response that we'll edit later
message = await ctx.respond("*Searching for lyrics...*")
parsed = self.utility.parse_song_input(song, activity)
if isinstance(parsed, tuple):
(search_artist, search_song, search_subsearch) = parsed
# Update the message to show what we're searching for
await message.edit(content=f"*Searching for '{search_song}' by {search_artist}...*")
# await ctx.respond(f"So, {search_song} by {search_artist}? Subsearch: {search_subsearch} I will try...") # Commented, useful for debugging
search_result: Optional[list] = await self.utility.lyric_search(
search_artist, search_song, search_subsearch
search_artist, search_song, search_subsearch, is_spam_channel=(ctx.channel.id in BOT_CHANIDS)
)
if not search_result:
@@ -97,29 +303,26 @@ class Sing(commands.Cog):
search_result_wrapped_short: list[str] = search_result[
2
] # Third is short wrapped lyrics
if ctx.channel.id not in BOT_CHANIDS:
short_lyrics = " ".join(
search_result_wrapped_short
) # Replace with shortened lyrics for non spamchans
short_lyrics = regex.sub(
r"\p{Vert_Space}", " / ", short_lyrics.strip()
)
return await ctx.respond(
f"**{search_result_song}** by **{search_result_artist}**\n-# {short_lyrics}"
)
# Now all channels get full pagination - removed the early return restriction
out_messages: list = []
footer: str = "" # Placeholder
for c, section in enumerate(search_result_wrapped):
if c == len(search_result_wrapped):
footer = f"`Found on: {search_result_src}`"
section = regex.sub(r"\p{Vert_Space}", " / ", section.strip())
msg: str = f"**{search_result_song}** by **{search_result_artist}**\n-# {section}\n{footer}"
if c > 1:
msg = "\n".join(msg.split("\n")[1:])
out_messages.append(msg.strip())
for msg in out_messages:
await ctx.send(msg)
# Use the appropriate wrapped lyrics based on channel type
is_spam_channel = ctx.channel.id in BOT_CHANIDS
if is_spam_channel:
# Spam channels get full-length lyrics
pages = search_result_wrapped.copy()
else:
# Non-spam channels get shorter lyrics for better UX
pages = search_result_wrapped.copy() # For now, still use full but we'll modify limits later
# Add source info to last page
if pages:
pages[-1] += f"\n\n`Found on: {search_result_src}`"
# Create and send view with paginator
paginator = LyricsPaginator(pages, search_result_song, search_result_artist)
view = paginator.get_view_for_page(0)
# Edit the existing message to show the lyrics
await message.edit(content=None, embed=paginator.embeds[0], view=view)
except Exception as e:
traceback.print_exc()
await ctx.respond(f"ERR: {str(e)}")
@@ -149,26 +352,29 @@ class Sing(commands.Cog):
)
member_id: int = member.id # if not(member.id == PODY_ID) else 1234134345497837679 # Use Thomas for Pody!
activity: Optional[discord.Activity] = None
if IS_SPAMCHAN:
await ctx.respond(f"***Reading activity of {member_display}...***")
# Initial response that we'll edit later
message = await ctx.respond(f"*Reading activity of {member_display}...*" if IS_SPAMCHAN else "*Processing...*")
for _activity in ctx.interaction.guild.get_member(member_id).activities:
if _activity.type == discord.ActivityType.listening:
activity = _activity
parsed: Union[tuple, bool] = self.utility.parse_song_input(
song=None, activity=activity
)
if not parsed:
return await ctx.respond(
f"Could not parse activity of {member_display}.", ephemeral=True
)
if IS_SPAMCHAN:
return await message.edit(content=f"Could not parse activity of {member_display}.")
else:
return await ctx.respond(
f"Could not parse activity of {member_display}.", ephemeral=True
)
if isinstance(parsed, tuple):
(search_artist, search_song, search_subsearch) = parsed
await ctx.respond(
"*Searching...*"
) # Must respond to interactions within 3 seconds, per Discord
await message.edit(content=f"*Searching for '{search_song}' by {search_artist}...*")
search_result: Optional[list] = await self.utility.lyric_search(
search_artist, search_song, search_subsearch
search_artist, search_song, search_subsearch, is_spam_channel=(ctx.channel.id in BOT_CHANIDS)
)
if not search_result:
await ctx.respond("ERR: No search result")
@@ -193,33 +399,24 @@ class Sing(commands.Cog):
2
] # Third index is shortened lyrics
if not IS_SPAMCHAN:
short_lyrics = " ".join(
search_result_wrapped_short
) # Replace with shortened lyrics for non spamchans
short_lyrics = regex.sub(
r"\p{Vert_Space}", " / ", short_lyrics.strip()
)
return await ctx.respond(
f"**{search_result_song}** by **{search_result_artist}**\n-# {short_lyrics}"
)
# All channels now get full paginated response
out_messages: list = []
footer: str = ""
c: int = 0
for section in search_result_wrapped:
c += 1
if c == len(search_result_wrapped):
footer = f"`Found on: {search_result_src}`"
# if ctx.guild.id == 1145182936002482196:
# section = section.upper()
section = regex.sub(r"\p{Vert_Space}", " / ", section.strip())
msg: str = f"**{search_result_song}** by **{search_result_artist}**\n-# {section}\n{footer}"
if c > 1:
msg = "\n".join(msg.split("\n")[1:])
out_messages.append(msg.strip())
for msg in out_messages:
await ctx.send(msg)
# Create paginator for lyrics
if not search_result_wrapped:
return await ctx.respond("No lyrics found.")
# Use the already smartly-wrapped pages from sing_util
pages = search_result_wrapped.copy() # Already processed by _smart_lyrics_wrap
# Add source info to last page
if pages:
pages[-1] += f"\n\n`Found on: {search_result_src}`"
# Create and send view with paginator
paginator = LyricsPaginator(pages, search_result_song, search_result_artist)
view = paginator.get_view_for_page(0)
# Edit the existing message to show the lyrics
await message.edit(content=None, embed=paginator.embeds[0], view=view)
except Exception as e:
traceback.print_exc()
return await ctx.respond(f"ERR: {str(e)}")
@@ -227,4 +424,5 @@ class Sing(commands.Cog):
def setup(bot) -> None:
"""Run on Cog Load"""
load_pagination_data() # Load existing pagination data from file
bot.add_cog(Sing(bot))

View File

@@ -25,10 +25,11 @@ cogs_list: list[str] = [
"sing",
"meme",
"lovehate",
"radio",
# "ollama",
# "radio",
]
bot_activity = discord.CustomActivity(name="I made cookies!")
bot_activity = discord.CustomActivity(name="I LIKE TURTLES")
load_dotenv()
@@ -42,7 +43,7 @@ class Havoc(bridge.Bot):
command_prefix=".",
intents=intents,
owner_ids=OWNERS,
activity=bot_activity,
activity=None,
help_command=commands.MinimalHelpCommand(),
)
self.BOT_CHANIDS = BOT_CHANIDS
@@ -83,6 +84,7 @@ class Havoc(bridge.Bot):
@commands.Cog.listener()
async def on_ready(self) -> None:
"""Run on Bot Ready"""
await self.change_presence(activity=None)
logging.info("%s online!", self.user)

View File

@@ -25,7 +25,11 @@ async def get_now_playing() -> Optional[str]:
) as request:
request.raise_for_status()
response_json = await request.json()
artistsong = response_json.get("artistsong")
artistsong: str = "N/A - N/A"
artist: Optional[str] = response_json.get("artist")
song: Optional[str] = response_json.get("song")
if artist and song:
artistsong = f"{artist} - {song}"
return artistsong
except Exception as e:
logging.critical("Now playing retrieval failed: %s", str(e))

View File

@@ -1,10 +1,12 @@
import logging
import regex
import aiohttp
import textwrap
import traceback
import aiohttp
import regex
import discord
from discord import Activity
from typing import Optional, Union
logger = logging.getLogger(__name__)
class Utility:
@@ -14,9 +16,134 @@ class Utility:
self.api_url: str = "http://127.0.0.1:52111/lyric/search"
self.api_src: str = "DISC-HAVOC"
def _smart_lyrics_wrap(self, lyrics: str, max_length: int = 1500, single_page: bool = False, max_verses: int = 100, max_lines: int = 150) -> list[str]:
"""
Intelligently wrap lyrics to avoid breaking verses in the middle
Prioritizes keeping verses intact over page length consistency
Args:
lyrics: Raw lyrics text
max_length: Maximum character length per page (soft limit)
single_page: If True, return only the first page
Returns:
List of lyrics pages
"""
if not lyrics:
return []
# Strip markdown formatting from lyrics
lyrics = discord.utils.escape_markdown(lyrics)
verses = []
current_verse: list[str] = []
# Handle both regular newlines and zero-width space newlines
lines = lyrics.replace("\u200b\n", "\n").split("\n")
empty_line_count = 0
for line in lines:
stripped_line = line.strip()
if not stripped_line or stripped_line in ["", "\u200b"]:
empty_line_count += 1
# One empty line indicates a section break (be more aggressive)
if empty_line_count >= 1 and current_verse:
verses.append("\n".join(current_verse))
current_verse = []
empty_line_count = 0
else:
empty_line_count = 0
current_verse.append(stripped_line)
# Add the last verse if it exists
if current_verse:
verses.append("\n".join(current_verse))
# If we have too few verses (verse detection failed), fallback to line-based splitting
if len(verses) <= 1:
all_lines = lyrics.split("\n")
verses = []
current_chunk = []
for line in all_lines:
current_chunk.append(line.strip())
# Split every 8-10 lines to create artificial "verses"
if len(current_chunk) >= 8:
verses.append("\n".join(current_chunk))
current_chunk = []
# Add remaining lines
if current_chunk:
verses.append("\n".join(current_chunk))
if not verses:
return [lyrics[:max_length]]
# If single page requested, return first verse or truncated
if single_page:
result = verses[0]
if len(result) > max_length:
# Try to fit at least part of the first verse
lines_in_verse = result.split("\n")
truncated_lines = []
current_length = 0
for line in lines_in_verse:
if current_length + len(line) + 1 <= max_length - 3: # -3 for "..."
truncated_lines.append(line)
current_length += len(line) + 1
else:
break
result = "\n".join(truncated_lines) + "..." if truncated_lines else verses[0][:max_length-3] + "..."
return [result]
# Group complete verses into pages, never breaking a verse
# Limit by character count, verse count, AND line count for visual appeal
max_verses_per_page = max_verses
max_lines_per_page = max_lines
pages = []
current_page_verses: list[str] = []
current_page_length = 0
current_page_lines = 0
for verse in verses:
verse_length = len(verse)
# Count lines properly - handle both regular newlines and zero-width space newlines
verse_line_count = verse.count("\n") + verse.count("\u200b\n") + 1
# Calculate totals if we add this verse (including separator)
separator_length = 3 if current_page_verses else 0 # "\n\n\n" between verses
separator_lines = 3 if current_page_verses else 0 # 3 empty lines between verses
total_length_with_verse = current_page_length + separator_length + verse_length
total_lines_with_verse = current_page_lines + separator_lines + verse_line_count
# Check all three limits: character, verse count, and line count
exceeds_length = total_length_with_verse > max_length
exceeds_verse_count = len(current_page_verses) >= max_verses_per_page
exceeds_line_count = total_lines_with_verse > max_lines_per_page
# If adding this verse would exceed any limit AND we already have verses on the page
if (exceeds_length or exceeds_verse_count or exceeds_line_count) and current_page_verses:
# Finish current page with existing verses
pages.append("\n\n".join(current_page_verses))
current_page_verses = [verse]
current_page_length = verse_length
current_page_lines = verse_line_count
else:
# Add verse to current page
current_page_verses.append(verse)
current_page_length = total_length_with_verse
current_page_lines = total_lines_with_verse
# Add the last page if it has content
if current_page_verses:
pages.append("\n\n".join(current_page_verses))
return pages if pages else [lyrics[:max_length]]
def parse_song_input(
self, song: Optional[str] = None, activity: Optional[Activity] = None
) -> Union[bool, tuple]:
self, song: str | None = None, activity: Activity | None = None,
) -> bool | tuple:
"""
Parse Song (Sing Command) Input
@@ -36,7 +163,7 @@ class Utility:
match activity.name.lower():
case "codey toons" | "cider" | "sonixd":
search_artist: str = " ".join(
str(activity.state).strip().split(" ")[1:]
str(activity.state).strip().split(" ")[1:],
)
search_artist = regex.sub(
r"(\s{0,})(\[(spotify|tidal|sonixd|browser|yt music)])$",
@@ -51,13 +178,13 @@ class Utility:
search_song = str(activity.details)
song = f"{search_artist} : {search_song}"
case "spotify":
if not activity.title or not activity.artist: # type: ignore
if not activity.title or not activity.artist: # type: ignore[attr-defined]
"""
Attributes exist, but mypy does not recognize them. Ignored.
"""
return False
search_artist = str(activity.artist) # type: ignore
search_song = str(activity.title) # type: ignore
search_artist = str(activity.artist) # type: ignore[attr-defined]
search_song = str(activity.title) # type: ignore[attr-defined]
song = f"{search_artist} : {search_song}"
case "serious.fm" | "cocks.fm" | "something":
if not activity.details:
@@ -78,27 +205,27 @@ class Utility:
return False
search_artist = song.split(search_split_by)[0].strip()
search_song = "".join(song.split(search_split_by)[1:]).strip()
search_subsearch: Optional[str] = None
search_subsearch: str | None = None
if (
search_split_by == ":" and len(song.split(":")) > 2
): # Support sub-search if : is used (per instructions)
search_song = song.split(
search_split_by
search_split_by,
)[
1
].strip() # Reduce search_song to only the 2nd split of : [the rest is meant to be lyric text]
search_subsearch = "".join(
song.split(search_split_by)[2:]
song.split(search_split_by)[2:],
) # Lyric text from split index 2 and beyond
return (search_artist, search_song, search_subsearch)
except Exception as e:
logging.debug("Exception: %s", str(e))
logger.debug("Exception: %s", str(e))
traceback.print_exc()
return False
async def lyric_search(
self, artist: str, song: str, sub: Optional[str] = None
) -> Optional[list]:
self, artist: str, song: str, sub: str | None = None, is_spam_channel: bool = True,
) -> list | None:
"""
Lyric Search
@@ -140,7 +267,7 @@ class Utility:
return [(f"ERR: {response.get('errorText')}",)]
out_lyrics = regex.sub(
r"<br>", "\u200b\n", response.get("lyrics", "")
r"<br>", "\u200b\n", response.get("lyrics", ""),
)
response_obj: dict = {
"artist": response.get("artist"),
@@ -154,24 +281,15 @@ class Utility:
lyrics = response_obj.get("lyrics")
if not lyrics:
return None
response_obj["lyrics"] = textwrap.wrap(
text=lyrics.strip(),
width=1500,
drop_whitespace=False,
replace_whitespace=False,
break_long_words=True,
break_on_hyphens=True,
max_lines=8,
)
response_obj["lyrics_short"] = textwrap.wrap(
text=lyrics.strip(),
width=750,
drop_whitespace=False,
replace_whitespace=False,
break_long_words=True,
break_on_hyphens=True,
max_lines=1,
)
# Use different limits based on channel type
if is_spam_channel:
# Spam channels: higher limits for more content per page
response_obj["lyrics"] = self._smart_lyrics_wrap(lyrics.strip(), max_length=8000, max_verses=100, max_lines=150)
else:
# Non-spam channels: much shorter limits for better UX in regular channels
response_obj["lyrics"] = self._smart_lyrics_wrap(lyrics.strip(), max_length=2000, max_verses=15, max_lines=25)
response_obj["lyrics_short"] = self._smart_lyrics_wrap(lyrics.strip(), max_length=500, single_page=True)
return [
(
@@ -186,4 +304,4 @@ class Utility:
]
except Exception as e:
traceback.print_exc()
return [f"Retrieval failed: {str(e)}"]
return [f"Retrieval failed: {e!s}"]

1508
uv.lock generated Normal file

File diff suppressed because it is too large Load Diff