308 lines
13 KiB
Python
308 lines
13 KiB
Python
import logging
|
|
import traceback
|
|
|
|
import aiohttp
|
|
import regex
|
|
import discord
|
|
from discord import Activity
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class Utility:
|
|
"""Sing Utility"""
|
|
|
|
def __init__(self) -> None:
|
|
self.api_url: str = "http://127.0.0.1:52111/lyric/search"
|
|
self.api_src: str = "DISC-HAVOC"
|
|
|
|
def _smart_lyrics_wrap(self, lyrics: str, max_length: int = 1500, single_page: bool = False, max_verses: int = 100, max_lines: int = 150) -> list[str]:
|
|
"""
|
|
Intelligently wrap lyrics to avoid breaking verses in the middle
|
|
Prioritizes keeping verses intact over page length consistency
|
|
|
|
Args:
|
|
lyrics: Raw lyrics text
|
|
max_length: Maximum character length per page (soft limit)
|
|
single_page: If True, return only the first page
|
|
|
|
Returns:
|
|
List of lyrics pages
|
|
"""
|
|
if not lyrics:
|
|
return []
|
|
|
|
# Strip markdown formatting from lyrics
|
|
lyrics = discord.utils.escape_markdown(lyrics)
|
|
verses = []
|
|
current_verse: list[str] = []
|
|
|
|
# Handle both regular newlines and zero-width space newlines
|
|
lines = lyrics.replace("\u200b\n", "\n").split("\n")
|
|
empty_line_count = 0
|
|
|
|
for line in lines:
|
|
stripped_line = line.strip()
|
|
if not stripped_line or stripped_line in ["", "\u200b"]:
|
|
empty_line_count += 1
|
|
# One empty line indicates a section break (be more aggressive)
|
|
if empty_line_count >= 1 and current_verse:
|
|
verses.append("\n".join(current_verse))
|
|
current_verse = []
|
|
empty_line_count = 0
|
|
else:
|
|
empty_line_count = 0
|
|
current_verse.append(stripped_line)
|
|
|
|
# Add the last verse if it exists
|
|
if current_verse:
|
|
verses.append("\n".join(current_verse))
|
|
|
|
# If we have too few verses (verse detection failed), fallback to line-based splitting
|
|
if len(verses) <= 1:
|
|
all_lines = lyrics.split("\n")
|
|
verses = []
|
|
current_chunk = []
|
|
|
|
for line in all_lines:
|
|
current_chunk.append(line.strip())
|
|
# Split every 8-10 lines to create artificial "verses"
|
|
if len(current_chunk) >= 8:
|
|
verses.append("\n".join(current_chunk))
|
|
current_chunk = []
|
|
|
|
# Add remaining lines
|
|
if current_chunk:
|
|
verses.append("\n".join(current_chunk))
|
|
|
|
if not verses:
|
|
return [lyrics[:max_length]]
|
|
|
|
# If single page requested, return first verse or truncated
|
|
if single_page:
|
|
result = verses[0]
|
|
if len(result) > max_length:
|
|
# Try to fit at least part of the first verse
|
|
lines_in_verse = result.split("\n")
|
|
truncated_lines = []
|
|
current_length = 0
|
|
|
|
for line in lines_in_verse:
|
|
if current_length + len(line) + 1 <= max_length - 3: # -3 for "..."
|
|
truncated_lines.append(line)
|
|
current_length += len(line) + 1
|
|
else:
|
|
break
|
|
|
|
result = "\n".join(truncated_lines) + "..." if truncated_lines else verses[0][:max_length-3] + "..."
|
|
return [result]
|
|
|
|
# Group complete verses into pages, never breaking a verse
|
|
# Limit by character count, verse count, AND line count for visual appeal
|
|
max_verses_per_page = max_verses
|
|
max_lines_per_page = max_lines
|
|
pages = []
|
|
current_page_verses: list[str] = []
|
|
current_page_length = 0
|
|
current_page_lines = 0
|
|
|
|
for verse in verses:
|
|
verse_length = len(verse)
|
|
# Count lines properly - handle both regular newlines and zero-width space newlines
|
|
verse_line_count = verse.count("\n") + verse.count("\u200b\n") + 1
|
|
|
|
# Calculate totals if we add this verse (including separator)
|
|
separator_length = 3 if current_page_verses else 0 # "\n\n\n" between verses
|
|
separator_lines = 3 if current_page_verses else 0 # 3 empty lines between verses
|
|
total_length_with_verse = current_page_length + separator_length + verse_length
|
|
total_lines_with_verse = current_page_lines + separator_lines + verse_line_count
|
|
|
|
# Check all three limits: character, verse count, and line count
|
|
exceeds_length = total_length_with_verse > max_length
|
|
exceeds_verse_count = len(current_page_verses) >= max_verses_per_page
|
|
exceeds_line_count = total_lines_with_verse > max_lines_per_page
|
|
|
|
# If adding this verse would exceed any limit AND we already have verses on the page
|
|
if (exceeds_length or exceeds_verse_count or exceeds_line_count) and current_page_verses:
|
|
# Finish current page with existing verses
|
|
pages.append("\n\n".join(current_page_verses))
|
|
current_page_verses = [verse]
|
|
current_page_length = verse_length
|
|
current_page_lines = verse_line_count
|
|
else:
|
|
# Add verse to current page
|
|
current_page_verses.append(verse)
|
|
current_page_length = total_length_with_verse
|
|
current_page_lines = total_lines_with_verse
|
|
|
|
# Add the last page if it has content
|
|
if current_page_verses:
|
|
pages.append("\n\n".join(current_page_verses))
|
|
|
|
return pages if pages else [lyrics[:max_length]]
|
|
|
|
def parse_song_input(
|
|
self, song: str | None = None, activity: Activity | None = None,
|
|
) -> bool | tuple:
|
|
"""
|
|
Parse Song (Sing Command) Input
|
|
|
|
Args:
|
|
song (Optional[str]): Song to search
|
|
activity (Optional[discord.Activity]): Discord activity, used to attempt lookup if no song is provided
|
|
|
|
Returns:
|
|
Union[bool, tuple]
|
|
"""
|
|
try:
|
|
if (not song or len(song) < 2) and not activity:
|
|
return False
|
|
if not song and activity:
|
|
if not activity.name:
|
|
return False # No valid activity found
|
|
match activity.name.lower():
|
|
case "codey toons" | "cider" | "sonixd":
|
|
search_artist: str = " ".join(
|
|
str(activity.state).strip().split(" ")[1:],
|
|
)
|
|
search_artist = regex.sub(
|
|
r"(\s{0,})(\[(spotify|tidal|sonixd|browser|yt music)])$",
|
|
"",
|
|
search_artist.strip(),
|
|
flags=regex.IGNORECASE,
|
|
)
|
|
search_song = str(activity.details)
|
|
song = f"{search_artist} : {search_song}"
|
|
case "tidal hi-fi":
|
|
search_artist = str(activity.state)
|
|
search_song = str(activity.details)
|
|
song = f"{search_artist} : {search_song}"
|
|
case "spotify":
|
|
if not activity.title or not activity.artist: # type: ignore[attr-defined]
|
|
"""
|
|
Attributes exist, but mypy does not recognize them. Ignored.
|
|
"""
|
|
return False
|
|
search_artist = str(activity.artist) # type: ignore[attr-defined]
|
|
search_song = str(activity.title) # type: ignore[attr-defined]
|
|
song = f"{search_artist} : {search_song}"
|
|
case "serious.fm" | "cocks.fm" | "something":
|
|
if not activity.details:
|
|
song = str(activity.state)
|
|
else:
|
|
search_artist = str(activity.state).rsplit("[", maxsplit=1)[
|
|
0
|
|
] # Strip genre
|
|
search_song = str(activity.details)
|
|
song = f"{search_artist} : {search_song}"
|
|
case _:
|
|
return False # Unsupported activity detected
|
|
|
|
search_split_by: str = (
|
|
":" if not (song) or len(song.split(":")) > 1 else "-"
|
|
) # Support either : or - to separate artist/track
|
|
if not song:
|
|
return False
|
|
search_artist = song.split(search_split_by)[0].strip()
|
|
search_song = "".join(song.split(search_split_by)[1:]).strip()
|
|
search_subsearch: str | None = None
|
|
if (
|
|
search_split_by == ":" and len(song.split(":")) > 2
|
|
): # Support sub-search if : is used (per instructions)
|
|
search_song = song.split(
|
|
search_split_by,
|
|
)[
|
|
1
|
|
].strip() # Reduce search_song to only the 2nd split of : [the rest is meant to be lyric text]
|
|
search_subsearch = "".join(
|
|
song.split(search_split_by)[2:],
|
|
) # Lyric text from split index 2 and beyond
|
|
return (search_artist, search_song, search_subsearch)
|
|
except Exception as e:
|
|
logger.debug("Exception: %s", str(e))
|
|
traceback.print_exc()
|
|
return False
|
|
|
|
async def lyric_search(
|
|
self, artist: str, song: str, sub: str | None = None, is_spam_channel: bool = True,
|
|
) -> list | None:
|
|
"""
|
|
Lyric Search
|
|
|
|
Args:
|
|
artist (str): Artist to search
|
|
song (str): Song to search
|
|
sub (Optional[str]): Lyrics for subsearch
|
|
Returns:
|
|
Optional[list]
|
|
"""
|
|
try:
|
|
if not artist or not song:
|
|
return [("FAIL! Artist/Song not provided",)]
|
|
|
|
search_obj: dict = {
|
|
"a": artist.strip(),
|
|
"s": song.strip(),
|
|
"extra": True,
|
|
"src": self.api_src,
|
|
}
|
|
|
|
if len(song.strip()) < 1:
|
|
search_obj.pop("a")
|
|
search_obj.pop("s")
|
|
search_obj["t"] = artist.strip() # Parse failed, try title without sep
|
|
|
|
if sub and len(sub) >= 2:
|
|
search_obj["sub"] = sub.strip()
|
|
|
|
async with aiohttp.ClientSession() as session:
|
|
async with await session.post(
|
|
self.api_url,
|
|
json=search_obj,
|
|
timeout=aiohttp.ClientTimeout(connect=5, sock_read=10),
|
|
) as request:
|
|
request.raise_for_status()
|
|
response: dict = await request.json()
|
|
if response.get("err"):
|
|
return [(f"ERR: {response.get('errorText')}",)]
|
|
|
|
out_lyrics = regex.sub(
|
|
r"<br>", "\u200b\n", response.get("lyrics", ""),
|
|
)
|
|
response_obj: dict = {
|
|
"artist": response.get("artist"),
|
|
"song": response.get("song"),
|
|
"lyrics": out_lyrics,
|
|
"src": response.get("src"),
|
|
"confidence": float(response.get("confidence", 0.0)),
|
|
"time": float(response.get("time", -1.0)),
|
|
}
|
|
|
|
lyrics = response_obj.get("lyrics")
|
|
if not lyrics:
|
|
return None
|
|
# Use different limits based on channel type
|
|
if is_spam_channel:
|
|
# Spam channels: higher limits for more content per page
|
|
response_obj["lyrics"] = self._smart_lyrics_wrap(lyrics.strip(), max_length=8000, max_verses=100, max_lines=150)
|
|
else:
|
|
# Non-spam channels: much shorter limits for better UX in regular channels
|
|
response_obj["lyrics"] = self._smart_lyrics_wrap(lyrics.strip(), max_length=2000, max_verses=15, max_lines=25)
|
|
|
|
response_obj["lyrics_short"] = self._smart_lyrics_wrap(lyrics.strip(), max_length=500, single_page=True)
|
|
|
|
return [
|
|
(
|
|
response_obj.get("artist"),
|
|
response_obj.get("song"),
|
|
response_obj.get("src"),
|
|
f"{int(response_obj.get('confidence', -1.0))}%",
|
|
f"{response_obj.get('time', -666.0):.4f}s",
|
|
),
|
|
response_obj.get("lyrics"),
|
|
response_obj.get("lyrics_short"),
|
|
]
|
|
except Exception as e:
|
|
traceback.print_exc()
|
|
return [f"Retrieval failed: {e!s}"]
|