756 lines
34 KiB
Python
756 lines
34 KiB
Python
import io
|
|
import random
|
|
import asyncio
|
|
import logging
|
|
import traceback
|
|
from typing import Optional, TYPE_CHECKING
|
|
import discord
|
|
import requests
|
|
from discord.ext import bridge, commands
|
|
from disc_havoc import Havoc
|
|
import util
|
|
|
|
if TYPE_CHECKING:
|
|
from cogs.message_logger import MessageLogger
|
|
|
|
|
|
class Owner(commands.Cog):
|
|
"""Owner Cog for Havoc"""
|
|
|
|
def __init__(self, bot: Havoc) -> None:
|
|
self.bot: Havoc = bot
|
|
self.former_roles_store: dict[int, list[discord.Role]] = {}
|
|
self._temperature: int = random.randrange(20, 30)
|
|
|
|
@bridge.bridge_command(guild_ids=[1145182936002482196])
|
|
async def temperature(self, ctx, temp: Optional[int | str] = None) -> None:
|
|
"""
|
|
Set Temperature
|
|
"""
|
|
if not temp:
|
|
return await ctx.respond(
|
|
f"The current temperature is: {self._temperature} °C"
|
|
)
|
|
|
|
if not self.bot.is_owner(ctx.author):
|
|
return await ctx.respond("I am afraid I can't let you do that.")
|
|
try:
|
|
_temperature: int = int(temp)
|
|
except Exception as e:
|
|
logging.debug("Exception: %s", str(e))
|
|
return await ctx.respond("Invalid input")
|
|
if _temperature < -15:
|
|
return await ctx.respond("Too cold! (-15°C minimum)")
|
|
elif _temperature > 35:
|
|
return await ctx.respond("Too hot! (35°C maximum)")
|
|
self._temperature = _temperature
|
|
return await ctx.respond(
|
|
f"As per your request, I have adjusted the temperature to {_temperature} °C."
|
|
)
|
|
|
|
@bridge.bridge_command()
|
|
@commands.is_owner()
|
|
async def editmsg(self, ctx, msgid: str, *, newcontent: str) -> None:
|
|
"""
|
|
Edit a message previously sent by the bot
|
|
"""
|
|
|
|
try:
|
|
message: Optional[discord.Message] = self.bot.get_message(int(msgid))
|
|
if not message:
|
|
await ctx.respond(
|
|
f"**Failed:** Message {msgid} not found.", ephemeral=True
|
|
)
|
|
return None
|
|
await message.edit(content=newcontent)
|
|
await ctx.respond("**Done!**", ephemeral=True)
|
|
except Exception as e:
|
|
await ctx.respond(f"**Failed:** {str(e)}", ephemeral=True)
|
|
|
|
@bridge.bridge_command()
|
|
@commands.is_owner()
|
|
async def reload(self, ctx) -> None:
|
|
"""
|
|
Reload Cogs
|
|
"""
|
|
|
|
self.bot.load_exts(False)
|
|
await ctx.respond("Reloaded!", ephemeral=True)
|
|
|
|
@bridge.bridge_command()
|
|
@commands.is_owner()
|
|
async def say(self, ctx, *, parameters: str) -> None:
|
|
"""
|
|
Make me say something in a channel
|
|
"""
|
|
_parameters: list[str] = parameters.split(" ")
|
|
|
|
if not len(_parameters) > 1:
|
|
return await ctx.respond(
|
|
"**Error**: Incorrect command usage; required: <chan> <msg>",
|
|
ephemeral=True,
|
|
)
|
|
|
|
channel: str = _parameters[0]
|
|
channel_mentions: list[int] = discord.utils.raw_channel_mentions(channel)
|
|
if channel_mentions:
|
|
channel = str(channel_mentions[0])
|
|
msg: str = " ".join(_parameters[1:])
|
|
await util.discord_helpers.send_message(self.bot, channel=channel, message=msg)
|
|
return await ctx.respond("**Done.**", ephemeral=True)
|
|
|
|
@bridge.bridge_command()
|
|
@commands.is_owner()
|
|
async def chgstatus(self, ctx, *, status: Optional[str] = None) -> None:
|
|
"""
|
|
Change bots status
|
|
"""
|
|
if not status:
|
|
return await ctx.respond(
|
|
"ERR: No status provided to change to!", ephemeral=True
|
|
)
|
|
|
|
await self.bot.change_presence(
|
|
status=discord.Status.online,
|
|
activity=discord.CustomActivity(name=status.strip()),
|
|
)
|
|
await ctx.respond("Done!", ephemeral=True)
|
|
|
|
@commands.message_command(name="Remove Messages Starting Here")
|
|
@commands.is_owner()
|
|
async def purge(self, ctx, message: discord.Message) -> None:
|
|
"""
|
|
Purge Messages
|
|
|
|
Args:
|
|
ctx (Any): Discord context
|
|
message (discord.Message): Discord message
|
|
Returns:
|
|
None
|
|
"""
|
|
try:
|
|
await ctx.channel.purge(
|
|
after=message,
|
|
bulk=True,
|
|
limit=900000,
|
|
reason=f"Purge initiated by {ctx.author.display_name}",
|
|
)
|
|
await message.delete(reason=f"Purge initiated by {ctx.author.display_name}")
|
|
await ctx.respond("**Done!**")
|
|
# Wait 3 seconds, then delete interaction
|
|
await asyncio.sleep(3)
|
|
interaction = await ctx.interaction.original_response()
|
|
await interaction.delete()
|
|
except Exception as e:
|
|
traceback.print_exc()
|
|
return await ctx.respond(f"**ERR: {str(e)}**")
|
|
|
|
@commands.message_command(name="Move to Memes")
|
|
@commands.is_owner()
|
|
async def movememe(self, ctx, message: discord.Message) -> None:
|
|
"""
|
|
Move to Memes
|
|
|
|
Args:
|
|
ctx (Any): Discord context
|
|
message (discord.Message): Discord message
|
|
Returns:
|
|
None
|
|
"""
|
|
try:
|
|
if not isinstance(message.channel, discord.TextChannel):
|
|
return
|
|
memes_channel: discord.TextChannel = ctx.guild.get_channel(
|
|
1147229098544988261
|
|
)
|
|
message_content: str = message.content
|
|
message_author: str = message.author.display_name
|
|
message_channel: str = message.channel.name
|
|
_file: Optional[discord.File] = None
|
|
if message.attachments:
|
|
for item in message.attachments:
|
|
if item.url and len(item.url) >= 20:
|
|
response = requests.get(item.url, stream=True, timeout=20)
|
|
image_data: bytes = response.raw.read() or b''
|
|
image: io.BytesIO = io.BytesIO(image_data)
|
|
ext: str = item.url.split(".")[-1].split("?")[0].split("&")[0]
|
|
_file = discord.File(image, filename=f"img.{ext}")
|
|
if not _file:
|
|
return # No file to move
|
|
await memes_channel.send(
|
|
f"*Performing bureaucratic duties (this didn't belong in #{message_channel})...*\n**{message_author}:** {message_content}",
|
|
file=_file,
|
|
)
|
|
await message.delete()
|
|
await ctx.respond("OK!", ephemeral=True)
|
|
except Exception as e:
|
|
logging.debug("Exception: %s", str(e))
|
|
traceback.print_exc()
|
|
return await ctx.respond("Failed! :(", ephemeral=True)
|
|
|
|
@commands.message_command(name="Move to Drugs")
|
|
@commands.is_owner()
|
|
async def movedrugs(self, ctx, message: discord.Message) -> None:
|
|
"""
|
|
Move to Drugs
|
|
|
|
Args:
|
|
ctx (Any): Discord context
|
|
message (discord.Message): Discord message
|
|
Returns:
|
|
None
|
|
"""
|
|
try:
|
|
if not isinstance(message.channel, discord.TextChannel):
|
|
return
|
|
drugs_channel: discord.TextChannel = ctx.guild.get_channel(
|
|
1172247451047034910
|
|
)
|
|
message_content: str = message.content
|
|
message_author: str = message.author.display_name
|
|
message_channel: str = message.channel.name
|
|
_file: Optional[discord.File] = None
|
|
if message.attachments:
|
|
for item in message.attachments:
|
|
if item.url and len(item.url) >= 20:
|
|
response = requests.get(item.url, stream=True, timeout=20)
|
|
image_data: bytes = response.raw.read() or b''
|
|
image: io.BytesIO = io.BytesIO(image_data)
|
|
ext: str = item.url.split(".")[-1].split("?")[0].split("&")[0]
|
|
_file = discord.File(image, filename=f"img.{ext}")
|
|
if not _file:
|
|
return # No file to move
|
|
await drugs_channel.send(
|
|
f"*Performing bureaucratic duties (this didn't belong in #{message_channel})...\
|
|
*\n**{message_author}:** {message_content}",
|
|
file=_file,
|
|
)
|
|
await message.delete()
|
|
await ctx.respond("OK!", ephemeral=True)
|
|
except Exception as e:
|
|
logging.debug("Exception: %s", str(e))
|
|
traceback.print_exc()
|
|
return await ctx.respond("Failed! :(", ephemeral=True)
|
|
|
|
@commands.message_command(name="Move to fun-house")
|
|
@commands.is_owner()
|
|
async def movefunhouse(self, ctx, message: discord.Message) -> None:
|
|
"""
|
|
Move to fun-house
|
|
|
|
Args:
|
|
ctx (Any): Discord context
|
|
message (discord.Message): Discord message
|
|
Returns:
|
|
None
|
|
"""
|
|
try:
|
|
if not isinstance(message.channel, discord.TextChannel):
|
|
return
|
|
funhouse_channel: discord.TextChannel = ctx.guild.get_channel(
|
|
1213160512364478607
|
|
)
|
|
message_content: str = message.content
|
|
message_author: str = message.author.display_name
|
|
message_channel: str = message.channel.name
|
|
_file: Optional[discord.File] = None
|
|
if message.attachments:
|
|
for item in message.attachments:
|
|
if item.url and len(item.url) >= 20:
|
|
response = requests.get(item.url, stream=True, timeout=20)
|
|
image_data: bytes = response.raw.read() or b''
|
|
image: io.BytesIO = io.BytesIO(image_data)
|
|
ext: str = item.url.split(".")[-1].split("?")[0].split("&")[0]
|
|
_file = discord.File(image, filename=f"img.{ext}")
|
|
await funhouse_channel.send(
|
|
f"*Performing bureaucratic duties (this didn't belong in #{message_channel})\
|
|
...*\n**{message_author}:** {message_content}"
|
|
)
|
|
await message.delete()
|
|
await ctx.respond("OK!", ephemeral=True)
|
|
except Exception as e:
|
|
logging.debug("Exception: %s", str(e))
|
|
traceback.print_exc()
|
|
return await ctx.respond("Failed! :(", ephemeral=True)
|
|
|
|
@commands.user_command(name="Einsperren!", guild_ids=[145182936002482196])
|
|
@commands.is_owner()
|
|
async def einsperren(self, ctx, member: discord.Member) -> None:
|
|
"""
|
|
Einsperren!
|
|
|
|
Args:
|
|
ctx (Any): Discord context
|
|
member (discord.Member): Discord member
|
|
Returns:
|
|
None
|
|
"""
|
|
try:
|
|
if not ctx.guild.id == 1145182936002482196:
|
|
return # Not home server!
|
|
if not member.roles:
|
|
return # No roles
|
|
audit_reason: str = f"Einsperren von {ctx.user.display_name}"
|
|
member = ctx.guild.get_member(member.id)
|
|
member_display: str = member.display_name
|
|
einsperren_role: discord.Role = (
|
|
ctx.guild.get_role(1235415059300093973)
|
|
if ctx.guild.id != 1145182936002482196
|
|
else ctx.guild.get_role(1235406301614309386)
|
|
)
|
|
member_roles: list = [
|
|
role for role in member.roles if not role.name == "@everyone"
|
|
]
|
|
member_role_names: list[str] = [
|
|
str(role.name).lower() for role in member_roles
|
|
]
|
|
opers_chan: discord.TextChannel = ctx.guild.get_channel(1181416083287187546)
|
|
if "einsperren" not in member_role_names:
|
|
try:
|
|
if member.id in self.former_roles_store:
|
|
self.former_roles_store.pop(member.id)
|
|
self.former_roles_store[member.id] = member.roles
|
|
except: # noqa
|
|
"""Safe to ignore"""
|
|
pass
|
|
try:
|
|
await member.edit(roles=[einsperren_role], reason=audit_reason)
|
|
await ctx.respond(
|
|
f"Gesendet {member_display} an einsperren.", ephemeral=True
|
|
)
|
|
await opers_chan.send(
|
|
f"@everyone: {ctx.user.display_name} gesendet {member_display} an einsperren."
|
|
)
|
|
except Exception as e:
|
|
logging.debug("Exception: %s", str(e))
|
|
traceback.print_exc()
|
|
return await ctx.respond("GOTTVERDAMMT!!", ephemeral=True)
|
|
self.former_roles_store[member.id] = member.roles
|
|
|
|
if member.id not in self.former_roles_store:
|
|
await member.edit(roles=[]) # No roles
|
|
else:
|
|
former_roles: list = self.former_roles_store.get(member.id, [0])
|
|
await member.edit(roles=former_roles, reason=f"De-{audit_reason}")
|
|
|
|
await ctx.respond(
|
|
f"{member_display} wurde von der Einsperre befreit.", ephemeral=True
|
|
)
|
|
await opers_chan.send(
|
|
f"{member_display} wurde von {ctx.user.display_name} aus der Einsperre befreit."
|
|
)
|
|
except Exception as e:
|
|
traceback.print_exc()
|
|
return await ctx.respond(f"ERR: {str(e)}", ephemeral=True)
|
|
|
|
def _get_message_logger(self) -> Optional["MessageLogger"]:
|
|
"""Get the MessageLogger cog with proper typing."""
|
|
cog = self.bot.get_cog('MessageLogger')
|
|
if cog is None:
|
|
return None
|
|
return cog # type: ignore[return-value]
|
|
|
|
@bridge.bridge_command()
|
|
@commands.is_owner()
|
|
async def db_stats(self, ctx) -> None:
|
|
"""Get database statistics including image cache info."""
|
|
try:
|
|
# Get the message logger cog to access the database
|
|
logger_cog = self._get_message_logger()
|
|
if not logger_cog or not logger_cog.db:
|
|
return await ctx.respond("Database not available.", ephemeral=True)
|
|
|
|
stats = await logger_cog.db.get_stats()
|
|
uncached = await logger_cog.db.get_uncached_counts()
|
|
|
|
embed = discord.Embed(title="Database Statistics", color=discord.Color.blue())
|
|
embed.add_field(name="Messages", value=f"{stats.get('total_messages', 0):,}", inline=True)
|
|
embed.add_field(name="Users", value=f"{stats.get('total_users', 0):,}", inline=True)
|
|
embed.add_field(name="Guilds", value=f"{stats.get('total_guilds', 0):,}", inline=True)
|
|
embed.add_field(name="Channels", value=f"{stats.get('total_channels', 0):,}", inline=True)
|
|
embed.add_field(name="Attachments", value=f"{stats.get('total_attachments', 0):,}", inline=True)
|
|
embed.add_field(name="Reactions", value=f"{stats.get('total_reactions', 0):,}", inline=True)
|
|
embed.add_field(name="Cached Images", value=f"{stats.get('cached_images', 0):,}", inline=True)
|
|
embed.add_field(name="Cache Size", value=f"{stats.get('cached_images_size_mb', 0):.2f} MB", inline=True)
|
|
embed.add_field(name="Deleted Messages", value=f"{stats.get('deleted_messages', 0):,}", inline=True)
|
|
|
|
# Uncached counts
|
|
uncached_text = (
|
|
f"Avatars: {uncached.get('users_missing_avatars', 0):,}\n"
|
|
f"Banners: {uncached.get('users_missing_banners', 0):,}\n"
|
|
f"Guild Avatars: {uncached.get('members_missing_guild_avatars', 0):,}\n"
|
|
f"Attachments: {uncached.get('attachments_missing_cache', 0):,}"
|
|
)
|
|
embed.add_field(name="Uncached Images", value=uncached_text, inline=False)
|
|
|
|
# Cache breakdown by type
|
|
if stats.get('cached_by_type'):
|
|
breakdown = "\n".join(f"{k}: {v:,}" for k, v in stats['cached_by_type'].items())
|
|
embed.add_field(name="Cache by Type", value=breakdown or "None", inline=False)
|
|
|
|
await ctx.respond(embed=embed)
|
|
except Exception as e:
|
|
logging.error(f"Error getting db stats: {e}")
|
|
traceback.print_exc()
|
|
await ctx.respond(f"Error: {str(e)}", ephemeral=True)
|
|
|
|
@bridge.bridge_command()
|
|
@commands.is_owner()
|
|
async def backfill_avatars(self, ctx, batch_size: int = 50, max_batches: int = 10) -> None:
|
|
"""Backfill missing avatar images into the cache."""
|
|
try:
|
|
logger_cog = self._get_message_logger()
|
|
if not logger_cog or not logger_cog.db:
|
|
return await ctx.respond("Database not available.", ephemeral=True)
|
|
|
|
await ctx.respond(f"Starting avatar backfill (batch_size={batch_size}, max_batches={max_batches})...")
|
|
|
|
stats = await logger_cog.db.backfill_missing_avatars(batch_size, max_batches)
|
|
|
|
embed = discord.Embed(title="Avatar Backfill Complete", color=discord.Color.green())
|
|
embed.add_field(name="Avatars", value=f"{stats['avatars_cached']}/{stats['avatars_processed']} cached", inline=True)
|
|
embed.add_field(name="Banners", value=f"{stats['banners_cached']}/{stats['banners_processed']} cached", inline=True)
|
|
embed.add_field(name="Guild Avatars", value=f"{stats['guild_avatars_cached']}/{stats['guild_avatars_processed']} cached", inline=True)
|
|
|
|
await ctx.respond(embed=embed)
|
|
except Exception as e:
|
|
logging.error(f"Error in backfill_avatars: {e}")
|
|
traceback.print_exc()
|
|
await ctx.respond(f"Error: {str(e)}", ephemeral=True)
|
|
|
|
@bridge.bridge_command()
|
|
@commands.is_owner()
|
|
async def backfill_attachments(self, ctx, batch_size: int = 50, max_batches: int = 10) -> None:
|
|
"""Backfill missing attachment images into the cache."""
|
|
try:
|
|
logger_cog = self._get_message_logger()
|
|
if not logger_cog or not logger_cog.db:
|
|
return await ctx.respond("Database not available.", ephemeral=True)
|
|
|
|
await ctx.respond(f"Starting attachment backfill (batch_size={batch_size}, max_batches={max_batches})...")
|
|
|
|
stats = await logger_cog.db.backfill_missing_attachments(batch_size, max_batches)
|
|
|
|
embed = discord.Embed(title="Attachment Backfill Complete", color=discord.Color.green())
|
|
embed.add_field(name="Attachments", value=f"{stats['attachments_cached']}/{stats['attachments_processed']} cached", inline=True)
|
|
|
|
await ctx.respond(embed=embed)
|
|
except Exception as e:
|
|
logging.error(f"Error in backfill_attachments: {e}")
|
|
traceback.print_exc()
|
|
await ctx.respond(f"Error: {str(e)}", ephemeral=True)
|
|
|
|
@bridge.bridge_command()
|
|
@commands.is_owner()
|
|
async def reset_history(self, ctx, guild_id: Optional[str] = None) -> None:
|
|
"""Reset history fetch progress to re-fetch messages (and reactions)."""
|
|
try:
|
|
logger_cog = self._get_message_logger()
|
|
if not logger_cog or not logger_cog.db or not logger_cog.db.pool:
|
|
return await ctx.respond("Database not available.", ephemeral=True)
|
|
|
|
pool = logger_cog.db.pool
|
|
assert pool is not None # Already checked above
|
|
async with pool.acquire() as conn:
|
|
if guild_id:
|
|
# Reset for specific guild
|
|
gid = int(guild_id)
|
|
await conn.execute("""
|
|
UPDATE history_fetch_progress SET is_complete = FALSE, messages_fetched = 0
|
|
WHERE channel_id IN (SELECT channel_id FROM channels WHERE guild_id = $1)
|
|
""", gid)
|
|
await ctx.respond(f"Reset history progress for guild {guild_id}. Re-queueing channels...")
|
|
|
|
# Re-queue channels for this guild
|
|
guild = self.bot.get_guild(gid)
|
|
if guild:
|
|
for channel in guild.channels:
|
|
if isinstance(channel, (discord.TextChannel, discord.Thread)):
|
|
await logger_cog.history_fetch_queue.put(channel)
|
|
await ctx.respond(f"Queued {len([c for c in guild.channels if isinstance(c, (discord.TextChannel, discord.Thread))])} channels for re-fetch.")
|
|
else:
|
|
# Reset all
|
|
await conn.execute("UPDATE history_fetch_progress SET is_complete = FALSE, messages_fetched = 0")
|
|
await ctx.respond("Reset all history progress. Re-queueing all channels...")
|
|
|
|
# Re-queue all channels
|
|
await logger_cog._queue_all_channels_for_history()
|
|
await ctx.respond(f"Queued {logger_cog.history_fetch_queue.qsize()} channels for re-fetch.")
|
|
|
|
except Exception as e:
|
|
logging.error(f"Error in reset_history: {e}")
|
|
traceback.print_exc()
|
|
await ctx.respond(f"Error: {str(e)}", ephemeral=True)
|
|
|
|
@bridge.bridge_command()
|
|
@commands.is_owner()
|
|
async def history_status(self, ctx) -> None:
|
|
"""Check the status of history fetching."""
|
|
try:
|
|
logger_cog = self._get_message_logger()
|
|
if not logger_cog or not logger_cog.db or not logger_cog.db.pool:
|
|
return await ctx.respond("Database not available.", ephemeral=True)
|
|
|
|
queue_size = logger_cog.history_fetch_queue.qsize()
|
|
is_fetching = logger_cog.is_fetching_history
|
|
|
|
pool = logger_cog.db.pool
|
|
assert pool is not None # Already checked above
|
|
async with pool.acquire() as conn:
|
|
total_channels = await conn.fetchval("SELECT COUNT(*) FROM history_fetch_progress")
|
|
complete = await conn.fetchval("SELECT COUNT(*) FROM history_fetch_progress WHERE is_complete = TRUE")
|
|
incomplete = await conn.fetchval("SELECT COUNT(*) FROM history_fetch_progress WHERE is_complete = FALSE")
|
|
total_fetched = await conn.fetchval("SELECT SUM(messages_fetched) FROM history_fetch_progress")
|
|
|
|
embed = discord.Embed(title="History Fetch Status", color=discord.Color.blue())
|
|
embed.add_field(name="Queue Size", value=f"{queue_size} channels", inline=True)
|
|
embed.add_field(name="Is Fetching", value="Yes" if is_fetching else "No", inline=True)
|
|
embed.add_field(name="Channels Tracked", value=f"{total_channels or 0}", inline=True)
|
|
embed.add_field(name="Complete", value=f"{complete or 0}", inline=True)
|
|
embed.add_field(name="Incomplete", value=f"{incomplete or 0}", inline=True)
|
|
embed.add_field(name="Messages Fetched", value=f"{total_fetched or 0:,}", inline=True)
|
|
|
|
await ctx.respond(embed=embed)
|
|
except Exception as e:
|
|
logging.error(f"Error in history_status: {e}")
|
|
traceback.print_exc()
|
|
await ctx.respond(f"Error: {str(e)}", ephemeral=True)
|
|
|
|
@bridge.bridge_command()
|
|
@commands.is_owner()
|
|
async def video_status(self, ctx) -> None:
|
|
"""Check video caching status and failures."""
|
|
try:
|
|
logger_cog = self._get_message_logger()
|
|
if not logger_cog or not logger_cog.db or not logger_cog.db.pool:
|
|
return await ctx.respond("Database not available.", ephemeral=True)
|
|
|
|
counts = await logger_cog.db.get_uncached_video_counts()
|
|
|
|
embed = discord.Embed(title="Video Cache Status", color=discord.Color.blue())
|
|
embed.add_field(name="Cached Videos", value=f"{counts.get('total_cached_videos', 0):,}", inline=True)
|
|
embed.add_field(name="Total Size", value=f"{counts.get('total_cached_videos_size_gb', 0):.2f} GB", inline=True)
|
|
embed.add_field(name="Failed Downloads", value=f"{counts.get('failed_video_downloads', 0):,}", inline=True)
|
|
embed.add_field(name="Attachments Pending", value=f"{counts.get('attachments_missing_video_cache', 0):,}", inline=True)
|
|
embed.add_field(name="Embeds Pending", value=f"{counts.get('embeds_missing_video_cache', 0):,}", inline=True)
|
|
|
|
await ctx.respond(embed=embed)
|
|
except Exception as e:
|
|
logging.error(f"Error in video_status: {e}")
|
|
traceback.print_exc()
|
|
await ctx.respond(f"Error: {str(e)}", ephemeral=True)
|
|
|
|
@bridge.bridge_command()
|
|
@commands.is_owner()
|
|
async def video_failures(self, ctx, limit: int = 10) -> None:
|
|
"""Show recent video download failures."""
|
|
try:
|
|
logger_cog = self._get_message_logger()
|
|
if not logger_cog or not logger_cog.db or not logger_cog.db.pool:
|
|
return await ctx.respond("Database not available.", ephemeral=True)
|
|
|
|
failures = await logger_cog.db.get_video_download_failures(limit)
|
|
|
|
if not failures:
|
|
return await ctx.respond("No video download failures recorded.", ephemeral=True)
|
|
|
|
lines = []
|
|
for f in failures:
|
|
url = f['source_url'][:60] + "..." if len(f['source_url']) > 60 else f['source_url']
|
|
error = f['error_message'][:80] + "..." if f['error_message'] and len(f['error_message']) > 80 else f['error_message']
|
|
lines.append(f"**{f['source_type']}** (x{f['error_count']}): `{url}`\n └ {error}")
|
|
|
|
content = "\n".join(lines[:10]) # Limit to avoid message too long
|
|
embed = discord.Embed(title=f"Video Failures (showing {len(failures)})", description=content, color=discord.Color.red())
|
|
await ctx.respond(embed=embed)
|
|
except Exception as e:
|
|
logging.error(f"Error in video_failures: {e}")
|
|
traceback.print_exc()
|
|
await ctx.respond(f"Error: {str(e)}", ephemeral=True)
|
|
|
|
@bridge.bridge_command()
|
|
@commands.is_owner()
|
|
async def video_retry(self, ctx, older_than_days: int = 7) -> None:
|
|
"""Clear old video failures to allow retrying downloads."""
|
|
try:
|
|
logger_cog = self._get_message_logger()
|
|
if not logger_cog or not logger_cog.db or not logger_cog.db.pool:
|
|
return await ctx.respond("Database not available.", ephemeral=True)
|
|
|
|
deleted = await logger_cog.db.clear_video_failures(older_than_days)
|
|
await ctx.respond(f"Cleared {deleted} video failures older than {older_than_days} days. They will be retried on next backfill cycle.", ephemeral=True)
|
|
except Exception as e:
|
|
logging.error(f"Error in video_retry: {e}")
|
|
traceback.print_exc()
|
|
await ctx.respond(f"Error: {str(e)}", ephemeral=True)
|
|
|
|
@bridge.bridge_command()
|
|
@commands.is_owner()
|
|
async def video_retry_all(self, ctx) -> None:
|
|
"""Clear ALL video failures to retry everything."""
|
|
try:
|
|
logger_cog = self._get_message_logger()
|
|
if not logger_cog or not logger_cog.db or not logger_cog.db.pool:
|
|
return await ctx.respond("Database not available.", ephemeral=True)
|
|
|
|
pool = logger_cog.db.pool
|
|
async with pool.acquire() as conn:
|
|
result = await conn.execute("DELETE FROM video_download_failures")
|
|
try:
|
|
deleted = int(result.split()[-1])
|
|
except Exception:
|
|
deleted = 0
|
|
|
|
await ctx.respond(f"Cleared {deleted} video failures. All will be retried on next backfill cycle.", ephemeral=True)
|
|
except Exception as e:
|
|
logging.error(f"Error in video_retry_all: {e}")
|
|
traceback.print_exc()
|
|
await ctx.respond(f"Error: {str(e)}", ephemeral=True)
|
|
|
|
@bridge.bridge_command()
|
|
@commands.is_owner()
|
|
async def video_retry_fetch(self, ctx, limit: int = 20) -> None:
|
|
"""Attempt to refresh attachment URLs for recent failed attachment downloads.
|
|
|
|
This will look up failures for source_type='attachment', find the related message
|
|
and attachment by filename, update the URL in the DB, and remove the failure
|
|
so the next backfill cycle retries it.
|
|
"""
|
|
try:
|
|
logger_cog = self._get_message_logger()
|
|
if not logger_cog or not logger_cog.db or not logger_cog.db.pool:
|
|
return await ctx.respond("Database not available.", ephemeral=True)
|
|
|
|
pool = logger_cog.db.pool
|
|
async with pool.acquire() as conn:
|
|
failures = await conn.fetch("SELECT source_url, source_type FROM video_download_failures WHERE source_type = 'attachment' ORDER BY last_failure DESC LIMIT $1", limit)
|
|
|
|
if not failures:
|
|
return await ctx.respond("No recent attachment download failures found.", ephemeral=True)
|
|
|
|
found = 0
|
|
updated = 0
|
|
not_found = []
|
|
|
|
for f in failures:
|
|
found += 1
|
|
src_url = f['source_url']
|
|
# Find attachment row matching this url
|
|
async with pool.acquire() as conn:
|
|
row = await conn.fetchrow("SELECT attachment_id, message_id, filename FROM attachments WHERE url = $1 LIMIT 1", src_url)
|
|
|
|
if not row:
|
|
not_found.append(src_url)
|
|
continue
|
|
|
|
attachment_id = row['attachment_id']
|
|
message_id = row['message_id']
|
|
filename = row['filename']
|
|
|
|
# Lookup message channel
|
|
async with pool.acquire() as conn:
|
|
ch_row = await conn.fetchrow("SELECT channel_id FROM messages WHERE message_id = $1 LIMIT 1", message_id)
|
|
|
|
if not ch_row or not ch_row.get('channel_id'):
|
|
not_found.append(src_url)
|
|
continue
|
|
|
|
channel_id = ch_row['channel_id']
|
|
channel = self.bot.get_channel(channel_id)
|
|
if not channel:
|
|
not_found.append(src_url)
|
|
continue
|
|
|
|
try:
|
|
# channel might be a cached object without fetch_message; try to fetch if necessary
|
|
if not hasattr(channel, 'fetch_message'):
|
|
try:
|
|
channel = await self.bot.fetch_channel(channel_id)
|
|
except Exception:
|
|
raise
|
|
|
|
# Use getattr and hasattr so static analyzers don't complain
|
|
if hasattr(channel, 'fetch_message'):
|
|
fetch = getattr(channel, 'fetch_message')
|
|
message = await fetch(message_id)
|
|
else:
|
|
raise RuntimeError("channel doesn't support fetch_message")
|
|
except Exception:
|
|
not_found.append(src_url)
|
|
continue
|
|
|
|
# Find updated attachment by filename
|
|
new_url = None
|
|
for att in message.attachments:
|
|
if att.filename == filename:
|
|
new_url = att.url
|
|
break
|
|
|
|
if not new_url:
|
|
not_found.append(src_url)
|
|
continue
|
|
|
|
# Update DB with new URL and remove failure
|
|
async with pool.acquire() as conn:
|
|
await conn.execute("UPDATE attachments SET url = $1 WHERE attachment_id = $2", new_url, attachment_id)
|
|
await conn.execute("DELETE FROM video_download_failures WHERE source_url = $1", src_url)
|
|
updated += 1
|
|
|
|
resp = f"Scanned {found} failures; updated {updated} attachment URLs; {len(not_found)} not updated."
|
|
if not_found:
|
|
resp += "\nNot updated examples: " + ", ".join(not_found[:3])
|
|
|
|
await ctx.respond(resp, ephemeral=True)
|
|
|
|
except Exception as e:
|
|
logging.error(f"Error in video_retry_fetch: {e}")
|
|
traceback.print_exc()
|
|
await ctx.respond(f"Error: {str(e)}", ephemeral=True)
|
|
|
|
@bridge.bridge_command()
|
|
@commands.is_owner()
|
|
async def video_backfill(self, ctx, batch_size: int = 10, max_batches: int = 0) -> None:
|
|
"""Trigger a video backfill run. Set max_batches=0 (default) for unlimited.
|
|
|
|
Usage:
|
|
.video_backfill (unlimited with batch_size=10)
|
|
.video_backfill 25 (unlimited with batch_size=25)
|
|
.video_backfill 10 100 (batch_size=10, max_batches=100)
|
|
"""
|
|
try:
|
|
logger_cog = self._get_message_logger()
|
|
if not logger_cog or not logger_cog.db or not logger_cog.db.pool:
|
|
return await ctx.respond("Database not available.", ephemeral=True)
|
|
|
|
# Check counts first
|
|
counts = await logger_cog.db.get_uncached_video_counts()
|
|
msg_pending = counts.get('messages_with_uncached_videos', 0)
|
|
att_pending = counts.get('attachments_missing_video_cache', 0)
|
|
emb_pending = counts.get('embeds_missing_video_cache', 0)
|
|
|
|
if msg_pending == 0:
|
|
return await ctx.respond(f"No videos need caching. (Cached: {counts.get('total_cached_videos', 0)}, Failed: {counts.get('failed_video_downloads', 0)})", ephemeral=True)
|
|
|
|
# If max_batches=0, pass None to mean unlimited batches
|
|
cap = None if max_batches == 0 else max_batches
|
|
|
|
await ctx.respond(f"Starting video backfill (batch_size={batch_size}, max_batches={'unlimited' if cap is None else max_batches})...\nMessages with uncached videos: {msg_pending} ({att_pending} attachments, {emb_pending} embeds)", ephemeral=True)
|
|
|
|
stats = await logger_cog.db.backfill_missing_videos(batch_size=batch_size, max_batches=cap)
|
|
|
|
embed = discord.Embed(title="Video Backfill Complete", color=discord.Color.green())
|
|
embed.add_field(name="Attachments Processed", value=str(stats.get('attachments_processed', 0)), inline=True)
|
|
embed.add_field(name="Attachments Cached", value=str(stats.get('attachments_cached', 0)), inline=True)
|
|
embed.add_field(name="Embeds Processed", value=str(stats.get('embeds_processed', 0)), inline=True)
|
|
embed.add_field(name="Embeds Cached", value=str(stats.get('embeds_cached', 0)), inline=True)
|
|
await ctx.respond(embed=embed, ephemeral=True)
|
|
except Exception as e:
|
|
logging.error(f"Error in video_backfill: {e}")
|
|
traceback.print_exc()
|
|
await ctx.respond(f"Error: {str(e)}", ephemeral=True)
|
|
|
|
|
|
def setup(bot) -> None:
|
|
"""Run on Cog Load"""
|
|
bot.add_cog(Owner(bot))
|