Files
discord-havoc/cogs/owner.py
2026-02-05 07:49:44 -05:00

756 lines
34 KiB
Python

import io
import random
import asyncio
import logging
import traceback
from typing import Optional, TYPE_CHECKING
import discord
import requests
from discord.ext import bridge, commands
from disc_havoc import Havoc
import util
if TYPE_CHECKING:
from cogs.message_logger import MessageLogger
class Owner(commands.Cog):
"""Owner Cog for Havoc"""
def __init__(self, bot: Havoc) -> None:
self.bot: Havoc = bot
self.former_roles_store: dict[int, list[discord.Role]] = {}
self._temperature: int = random.randrange(20, 30)
@bridge.bridge_command(guild_ids=[1145182936002482196])
async def temperature(self, ctx, temp: Optional[int | str] = None) -> None:
"""
Set Temperature
"""
if not temp:
return await ctx.respond(
f"The current temperature is: {self._temperature} °C"
)
if not self.bot.is_owner(ctx.author):
return await ctx.respond("I am afraid I can't let you do that.")
try:
_temperature: int = int(temp)
except Exception as e:
logging.debug("Exception: %s", str(e))
return await ctx.respond("Invalid input")
if _temperature < -15:
return await ctx.respond("Too cold! (-15°C minimum)")
elif _temperature > 35:
return await ctx.respond("Too hot! (35°C maximum)")
self._temperature = _temperature
return await ctx.respond(
f"As per your request, I have adjusted the temperature to {_temperature} °C."
)
@bridge.bridge_command()
@commands.is_owner()
async def editmsg(self, ctx, msgid: str, *, newcontent: str) -> None:
"""
Edit a message previously sent by the bot
"""
try:
message: Optional[discord.Message] = self.bot.get_message(int(msgid))
if not message:
await ctx.respond(
f"**Failed:** Message {msgid} not found.", ephemeral=True
)
return None
await message.edit(content=newcontent)
await ctx.respond("**Done!**", ephemeral=True)
except Exception as e:
await ctx.respond(f"**Failed:** {str(e)}", ephemeral=True)
@bridge.bridge_command()
@commands.is_owner()
async def reload(self, ctx) -> None:
"""
Reload Cogs
"""
self.bot.load_exts(False)
await ctx.respond("Reloaded!", ephemeral=True)
@bridge.bridge_command()
@commands.is_owner()
async def say(self, ctx, *, parameters: str) -> None:
"""
Make me say something in a channel
"""
_parameters: list[str] = parameters.split(" ")
if not len(_parameters) > 1:
return await ctx.respond(
"**Error**: Incorrect command usage; required: <chan> <msg>",
ephemeral=True,
)
channel: str = _parameters[0]
channel_mentions: list[int] = discord.utils.raw_channel_mentions(channel)
if channel_mentions:
channel = str(channel_mentions[0])
msg: str = " ".join(_parameters[1:])
await util.discord_helpers.send_message(self.bot, channel=channel, message=msg)
return await ctx.respond("**Done.**", ephemeral=True)
@bridge.bridge_command()
@commands.is_owner()
async def chgstatus(self, ctx, *, status: Optional[str] = None) -> None:
"""
Change bots status
"""
if not status:
return await ctx.respond(
"ERR: No status provided to change to!", ephemeral=True
)
await self.bot.change_presence(
status=discord.Status.online,
activity=discord.CustomActivity(name=status.strip()),
)
await ctx.respond("Done!", ephemeral=True)
@commands.message_command(name="Remove Messages Starting Here")
@commands.is_owner()
async def purge(self, ctx, message: discord.Message) -> None:
"""
Purge Messages
Args:
ctx (Any): Discord context
message (discord.Message): Discord message
Returns:
None
"""
try:
await ctx.channel.purge(
after=message,
bulk=True,
limit=900000,
reason=f"Purge initiated by {ctx.author.display_name}",
)
await message.delete(reason=f"Purge initiated by {ctx.author.display_name}")
await ctx.respond("**Done!**")
# Wait 3 seconds, then delete interaction
await asyncio.sleep(3)
interaction = await ctx.interaction.original_response()
await interaction.delete()
except Exception as e:
traceback.print_exc()
return await ctx.respond(f"**ERR: {str(e)}**")
@commands.message_command(name="Move to Memes")
@commands.is_owner()
async def movememe(self, ctx, message: discord.Message) -> None:
"""
Move to Memes
Args:
ctx (Any): Discord context
message (discord.Message): Discord message
Returns:
None
"""
try:
if not isinstance(message.channel, discord.TextChannel):
return
memes_channel: discord.TextChannel = ctx.guild.get_channel(
1147229098544988261
)
message_content: str = message.content
message_author: str = message.author.display_name
message_channel: str = message.channel.name
_file: Optional[discord.File] = None
if message.attachments:
for item in message.attachments:
if item.url and len(item.url) >= 20:
response = requests.get(item.url, stream=True, timeout=20)
image_data: bytes = response.raw.read() or b''
image: io.BytesIO = io.BytesIO(image_data)
ext: str = item.url.split(".")[-1].split("?")[0].split("&")[0]
_file = discord.File(image, filename=f"img.{ext}")
if not _file:
return # No file to move
await memes_channel.send(
f"*Performing bureaucratic duties (this didn't belong in #{message_channel})...*\n**{message_author}:** {message_content}",
file=_file,
)
await message.delete()
await ctx.respond("OK!", ephemeral=True)
except Exception as e:
logging.debug("Exception: %s", str(e))
traceback.print_exc()
return await ctx.respond("Failed! :(", ephemeral=True)
@commands.message_command(name="Move to Drugs")
@commands.is_owner()
async def movedrugs(self, ctx, message: discord.Message) -> None:
"""
Move to Drugs
Args:
ctx (Any): Discord context
message (discord.Message): Discord message
Returns:
None
"""
try:
if not isinstance(message.channel, discord.TextChannel):
return
drugs_channel: discord.TextChannel = ctx.guild.get_channel(
1172247451047034910
)
message_content: str = message.content
message_author: str = message.author.display_name
message_channel: str = message.channel.name
_file: Optional[discord.File] = None
if message.attachments:
for item in message.attachments:
if item.url and len(item.url) >= 20:
response = requests.get(item.url, stream=True, timeout=20)
image_data: bytes = response.raw.read() or b''
image: io.BytesIO = io.BytesIO(image_data)
ext: str = item.url.split(".")[-1].split("?")[0].split("&")[0]
_file = discord.File(image, filename=f"img.{ext}")
if not _file:
return # No file to move
await drugs_channel.send(
f"*Performing bureaucratic duties (this didn't belong in #{message_channel})...\
*\n**{message_author}:** {message_content}",
file=_file,
)
await message.delete()
await ctx.respond("OK!", ephemeral=True)
except Exception as e:
logging.debug("Exception: %s", str(e))
traceback.print_exc()
return await ctx.respond("Failed! :(", ephemeral=True)
@commands.message_command(name="Move to fun-house")
@commands.is_owner()
async def movefunhouse(self, ctx, message: discord.Message) -> None:
"""
Move to fun-house
Args:
ctx (Any): Discord context
message (discord.Message): Discord message
Returns:
None
"""
try:
if not isinstance(message.channel, discord.TextChannel):
return
funhouse_channel: discord.TextChannel = ctx.guild.get_channel(
1213160512364478607
)
message_content: str = message.content
message_author: str = message.author.display_name
message_channel: str = message.channel.name
_file: Optional[discord.File] = None
if message.attachments:
for item in message.attachments:
if item.url and len(item.url) >= 20:
response = requests.get(item.url, stream=True, timeout=20)
image_data: bytes = response.raw.read() or b''
image: io.BytesIO = io.BytesIO(image_data)
ext: str = item.url.split(".")[-1].split("?")[0].split("&")[0]
_file = discord.File(image, filename=f"img.{ext}")
await funhouse_channel.send(
f"*Performing bureaucratic duties (this didn't belong in #{message_channel})\
...*\n**{message_author}:** {message_content}"
)
await message.delete()
await ctx.respond("OK!", ephemeral=True)
except Exception as e:
logging.debug("Exception: %s", str(e))
traceback.print_exc()
return await ctx.respond("Failed! :(", ephemeral=True)
@commands.user_command(name="Einsperren!", guild_ids=[145182936002482196])
@commands.is_owner()
async def einsperren(self, ctx, member: discord.Member) -> None:
"""
Einsperren!
Args:
ctx (Any): Discord context
member (discord.Member): Discord member
Returns:
None
"""
try:
if not ctx.guild.id == 1145182936002482196:
return # Not home server!
if not member.roles:
return # No roles
audit_reason: str = f"Einsperren von {ctx.user.display_name}"
member = ctx.guild.get_member(member.id)
member_display: str = member.display_name
einsperren_role: discord.Role = (
ctx.guild.get_role(1235415059300093973)
if ctx.guild.id != 1145182936002482196
else ctx.guild.get_role(1235406301614309386)
)
member_roles: list = [
role for role in member.roles if not role.name == "@everyone"
]
member_role_names: list[str] = [
str(role.name).lower() for role in member_roles
]
opers_chan: discord.TextChannel = ctx.guild.get_channel(1181416083287187546)
if "einsperren" not in member_role_names:
try:
if member.id in self.former_roles_store:
self.former_roles_store.pop(member.id)
self.former_roles_store[member.id] = member.roles
except: # noqa
"""Safe to ignore"""
pass
try:
await member.edit(roles=[einsperren_role], reason=audit_reason)
await ctx.respond(
f"Gesendet {member_display} an einsperren.", ephemeral=True
)
await opers_chan.send(
f"@everyone: {ctx.user.display_name} gesendet {member_display} an einsperren."
)
except Exception as e:
logging.debug("Exception: %s", str(e))
traceback.print_exc()
return await ctx.respond("GOTTVERDAMMT!!", ephemeral=True)
self.former_roles_store[member.id] = member.roles
if member.id not in self.former_roles_store:
await member.edit(roles=[]) # No roles
else:
former_roles: list = self.former_roles_store.get(member.id, [0])
await member.edit(roles=former_roles, reason=f"De-{audit_reason}")
await ctx.respond(
f"{member_display} wurde von der Einsperre befreit.", ephemeral=True
)
await opers_chan.send(
f"{member_display} wurde von {ctx.user.display_name} aus der Einsperre befreit."
)
except Exception as e:
traceback.print_exc()
return await ctx.respond(f"ERR: {str(e)}", ephemeral=True)
def _get_message_logger(self) -> Optional["MessageLogger"]:
"""Get the MessageLogger cog with proper typing."""
cog = self.bot.get_cog('MessageLogger')
if cog is None:
return None
return cog # type: ignore[return-value]
@bridge.bridge_command()
@commands.is_owner()
async def db_stats(self, ctx) -> None:
"""Get database statistics including image cache info."""
try:
# Get the message logger cog to access the database
logger_cog = self._get_message_logger()
if not logger_cog or not logger_cog.db:
return await ctx.respond("Database not available.", ephemeral=True)
stats = await logger_cog.db.get_stats()
uncached = await logger_cog.db.get_uncached_counts()
embed = discord.Embed(title="Database Statistics", color=discord.Color.blue())
embed.add_field(name="Messages", value=f"{stats.get('total_messages', 0):,}", inline=True)
embed.add_field(name="Users", value=f"{stats.get('total_users', 0):,}", inline=True)
embed.add_field(name="Guilds", value=f"{stats.get('total_guilds', 0):,}", inline=True)
embed.add_field(name="Channels", value=f"{stats.get('total_channels', 0):,}", inline=True)
embed.add_field(name="Attachments", value=f"{stats.get('total_attachments', 0):,}", inline=True)
embed.add_field(name="Reactions", value=f"{stats.get('total_reactions', 0):,}", inline=True)
embed.add_field(name="Cached Images", value=f"{stats.get('cached_images', 0):,}", inline=True)
embed.add_field(name="Cache Size", value=f"{stats.get('cached_images_size_mb', 0):.2f} MB", inline=True)
embed.add_field(name="Deleted Messages", value=f"{stats.get('deleted_messages', 0):,}", inline=True)
# Uncached counts
uncached_text = (
f"Avatars: {uncached.get('users_missing_avatars', 0):,}\n"
f"Banners: {uncached.get('users_missing_banners', 0):,}\n"
f"Guild Avatars: {uncached.get('members_missing_guild_avatars', 0):,}\n"
f"Attachments: {uncached.get('attachments_missing_cache', 0):,}"
)
embed.add_field(name="Uncached Images", value=uncached_text, inline=False)
# Cache breakdown by type
if stats.get('cached_by_type'):
breakdown = "\n".join(f"{k}: {v:,}" for k, v in stats['cached_by_type'].items())
embed.add_field(name="Cache by Type", value=breakdown or "None", inline=False)
await ctx.respond(embed=embed)
except Exception as e:
logging.error(f"Error getting db stats: {e}")
traceback.print_exc()
await ctx.respond(f"Error: {str(e)}", ephemeral=True)
@bridge.bridge_command()
@commands.is_owner()
async def backfill_avatars(self, ctx, batch_size: int = 50, max_batches: int = 10) -> None:
"""Backfill missing avatar images into the cache."""
try:
logger_cog = self._get_message_logger()
if not logger_cog or not logger_cog.db:
return await ctx.respond("Database not available.", ephemeral=True)
await ctx.respond(f"Starting avatar backfill (batch_size={batch_size}, max_batches={max_batches})...")
stats = await logger_cog.db.backfill_missing_avatars(batch_size, max_batches)
embed = discord.Embed(title="Avatar Backfill Complete", color=discord.Color.green())
embed.add_field(name="Avatars", value=f"{stats['avatars_cached']}/{stats['avatars_processed']} cached", inline=True)
embed.add_field(name="Banners", value=f"{stats['banners_cached']}/{stats['banners_processed']} cached", inline=True)
embed.add_field(name="Guild Avatars", value=f"{stats['guild_avatars_cached']}/{stats['guild_avatars_processed']} cached", inline=True)
await ctx.respond(embed=embed)
except Exception as e:
logging.error(f"Error in backfill_avatars: {e}")
traceback.print_exc()
await ctx.respond(f"Error: {str(e)}", ephemeral=True)
@bridge.bridge_command()
@commands.is_owner()
async def backfill_attachments(self, ctx, batch_size: int = 50, max_batches: int = 10) -> None:
"""Backfill missing attachment images into the cache."""
try:
logger_cog = self._get_message_logger()
if not logger_cog or not logger_cog.db:
return await ctx.respond("Database not available.", ephemeral=True)
await ctx.respond(f"Starting attachment backfill (batch_size={batch_size}, max_batches={max_batches})...")
stats = await logger_cog.db.backfill_missing_attachments(batch_size, max_batches)
embed = discord.Embed(title="Attachment Backfill Complete", color=discord.Color.green())
embed.add_field(name="Attachments", value=f"{stats['attachments_cached']}/{stats['attachments_processed']} cached", inline=True)
await ctx.respond(embed=embed)
except Exception as e:
logging.error(f"Error in backfill_attachments: {e}")
traceback.print_exc()
await ctx.respond(f"Error: {str(e)}", ephemeral=True)
@bridge.bridge_command()
@commands.is_owner()
async def reset_history(self, ctx, guild_id: Optional[str] = None) -> None:
"""Reset history fetch progress to re-fetch messages (and reactions)."""
try:
logger_cog = self._get_message_logger()
if not logger_cog or not logger_cog.db or not logger_cog.db.pool:
return await ctx.respond("Database not available.", ephemeral=True)
pool = logger_cog.db.pool
assert pool is not None # Already checked above
async with pool.acquire() as conn:
if guild_id:
# Reset for specific guild
gid = int(guild_id)
await conn.execute("""
UPDATE history_fetch_progress SET is_complete = FALSE, messages_fetched = 0
WHERE channel_id IN (SELECT channel_id FROM channels WHERE guild_id = $1)
""", gid)
await ctx.respond(f"Reset history progress for guild {guild_id}. Re-queueing channels...")
# Re-queue channels for this guild
guild = self.bot.get_guild(gid)
if guild:
for channel in guild.channels:
if isinstance(channel, (discord.TextChannel, discord.Thread)):
await logger_cog.history_fetch_queue.put(channel)
await ctx.respond(f"Queued {len([c for c in guild.channels if isinstance(c, (discord.TextChannel, discord.Thread))])} channels for re-fetch.")
else:
# Reset all
await conn.execute("UPDATE history_fetch_progress SET is_complete = FALSE, messages_fetched = 0")
await ctx.respond("Reset all history progress. Re-queueing all channels...")
# Re-queue all channels
await logger_cog._queue_all_channels_for_history()
await ctx.respond(f"Queued {logger_cog.history_fetch_queue.qsize()} channels for re-fetch.")
except Exception as e:
logging.error(f"Error in reset_history: {e}")
traceback.print_exc()
await ctx.respond(f"Error: {str(e)}", ephemeral=True)
@bridge.bridge_command()
@commands.is_owner()
async def history_status(self, ctx) -> None:
"""Check the status of history fetching."""
try:
logger_cog = self._get_message_logger()
if not logger_cog or not logger_cog.db or not logger_cog.db.pool:
return await ctx.respond("Database not available.", ephemeral=True)
queue_size = logger_cog.history_fetch_queue.qsize()
is_fetching = logger_cog.is_fetching_history
pool = logger_cog.db.pool
assert pool is not None # Already checked above
async with pool.acquire() as conn:
total_channels = await conn.fetchval("SELECT COUNT(*) FROM history_fetch_progress")
complete = await conn.fetchval("SELECT COUNT(*) FROM history_fetch_progress WHERE is_complete = TRUE")
incomplete = await conn.fetchval("SELECT COUNT(*) FROM history_fetch_progress WHERE is_complete = FALSE")
total_fetched = await conn.fetchval("SELECT SUM(messages_fetched) FROM history_fetch_progress")
embed = discord.Embed(title="History Fetch Status", color=discord.Color.blue())
embed.add_field(name="Queue Size", value=f"{queue_size} channels", inline=True)
embed.add_field(name="Is Fetching", value="Yes" if is_fetching else "No", inline=True)
embed.add_field(name="Channels Tracked", value=f"{total_channels or 0}", inline=True)
embed.add_field(name="Complete", value=f"{complete or 0}", inline=True)
embed.add_field(name="Incomplete", value=f"{incomplete or 0}", inline=True)
embed.add_field(name="Messages Fetched", value=f"{total_fetched or 0:,}", inline=True)
await ctx.respond(embed=embed)
except Exception as e:
logging.error(f"Error in history_status: {e}")
traceback.print_exc()
await ctx.respond(f"Error: {str(e)}", ephemeral=True)
@bridge.bridge_command()
@commands.is_owner()
async def video_status(self, ctx) -> None:
"""Check video caching status and failures."""
try:
logger_cog = self._get_message_logger()
if not logger_cog or not logger_cog.db or not logger_cog.db.pool:
return await ctx.respond("Database not available.", ephemeral=True)
counts = await logger_cog.db.get_uncached_video_counts()
embed = discord.Embed(title="Video Cache Status", color=discord.Color.blue())
embed.add_field(name="Cached Videos", value=f"{counts.get('total_cached_videos', 0):,}", inline=True)
embed.add_field(name="Total Size", value=f"{counts.get('total_cached_videos_size_gb', 0):.2f} GB", inline=True)
embed.add_field(name="Failed Downloads", value=f"{counts.get('failed_video_downloads', 0):,}", inline=True)
embed.add_field(name="Attachments Pending", value=f"{counts.get('attachments_missing_video_cache', 0):,}", inline=True)
embed.add_field(name="Embeds Pending", value=f"{counts.get('embeds_missing_video_cache', 0):,}", inline=True)
await ctx.respond(embed=embed)
except Exception as e:
logging.error(f"Error in video_status: {e}")
traceback.print_exc()
await ctx.respond(f"Error: {str(e)}", ephemeral=True)
@bridge.bridge_command()
@commands.is_owner()
async def video_failures(self, ctx, limit: int = 10) -> None:
"""Show recent video download failures."""
try:
logger_cog = self._get_message_logger()
if not logger_cog or not logger_cog.db or not logger_cog.db.pool:
return await ctx.respond("Database not available.", ephemeral=True)
failures = await logger_cog.db.get_video_download_failures(limit)
if not failures:
return await ctx.respond("No video download failures recorded.", ephemeral=True)
lines = []
for f in failures:
url = f['source_url'][:60] + "..." if len(f['source_url']) > 60 else f['source_url']
error = f['error_message'][:80] + "..." if f['error_message'] and len(f['error_message']) > 80 else f['error_message']
lines.append(f"**{f['source_type']}** (x{f['error_count']}): `{url}`\n{error}")
content = "\n".join(lines[:10]) # Limit to avoid message too long
embed = discord.Embed(title=f"Video Failures (showing {len(failures)})", description=content, color=discord.Color.red())
await ctx.respond(embed=embed)
except Exception as e:
logging.error(f"Error in video_failures: {e}")
traceback.print_exc()
await ctx.respond(f"Error: {str(e)}", ephemeral=True)
@bridge.bridge_command()
@commands.is_owner()
async def video_retry(self, ctx, older_than_days: int = 7) -> None:
"""Clear old video failures to allow retrying downloads."""
try:
logger_cog = self._get_message_logger()
if not logger_cog or not logger_cog.db or not logger_cog.db.pool:
return await ctx.respond("Database not available.", ephemeral=True)
deleted = await logger_cog.db.clear_video_failures(older_than_days)
await ctx.respond(f"Cleared {deleted} video failures older than {older_than_days} days. They will be retried on next backfill cycle.", ephemeral=True)
except Exception as e:
logging.error(f"Error in video_retry: {e}")
traceback.print_exc()
await ctx.respond(f"Error: {str(e)}", ephemeral=True)
@bridge.bridge_command()
@commands.is_owner()
async def video_retry_all(self, ctx) -> None:
"""Clear ALL video failures to retry everything."""
try:
logger_cog = self._get_message_logger()
if not logger_cog or not logger_cog.db or not logger_cog.db.pool:
return await ctx.respond("Database not available.", ephemeral=True)
pool = logger_cog.db.pool
async with pool.acquire() as conn:
result = await conn.execute("DELETE FROM video_download_failures")
try:
deleted = int(result.split()[-1])
except Exception:
deleted = 0
await ctx.respond(f"Cleared {deleted} video failures. All will be retried on next backfill cycle.", ephemeral=True)
except Exception as e:
logging.error(f"Error in video_retry_all: {e}")
traceback.print_exc()
await ctx.respond(f"Error: {str(e)}", ephemeral=True)
@bridge.bridge_command()
@commands.is_owner()
async def video_retry_fetch(self, ctx, limit: int = 20) -> None:
"""Attempt to refresh attachment URLs for recent failed attachment downloads.
This will look up failures for source_type='attachment', find the related message
and attachment by filename, update the URL in the DB, and remove the failure
so the next backfill cycle retries it.
"""
try:
logger_cog = self._get_message_logger()
if not logger_cog or not logger_cog.db or not logger_cog.db.pool:
return await ctx.respond("Database not available.", ephemeral=True)
pool = logger_cog.db.pool
async with pool.acquire() as conn:
failures = await conn.fetch("SELECT source_url, source_type FROM video_download_failures WHERE source_type = 'attachment' ORDER BY last_failure DESC LIMIT $1", limit)
if not failures:
return await ctx.respond("No recent attachment download failures found.", ephemeral=True)
found = 0
updated = 0
not_found = []
for f in failures:
found += 1
src_url = f['source_url']
# Find attachment row matching this url
async with pool.acquire() as conn:
row = await conn.fetchrow("SELECT attachment_id, message_id, filename FROM attachments WHERE url = $1 LIMIT 1", src_url)
if not row:
not_found.append(src_url)
continue
attachment_id = row['attachment_id']
message_id = row['message_id']
filename = row['filename']
# Lookup message channel
async with pool.acquire() as conn:
ch_row = await conn.fetchrow("SELECT channel_id FROM messages WHERE message_id = $1 LIMIT 1", message_id)
if not ch_row or not ch_row.get('channel_id'):
not_found.append(src_url)
continue
channel_id = ch_row['channel_id']
channel = self.bot.get_channel(channel_id)
if not channel:
not_found.append(src_url)
continue
try:
# channel might be a cached object without fetch_message; try to fetch if necessary
if not hasattr(channel, 'fetch_message'):
try:
channel = await self.bot.fetch_channel(channel_id)
except Exception:
raise
# Use getattr and hasattr so static analyzers don't complain
if hasattr(channel, 'fetch_message'):
fetch = getattr(channel, 'fetch_message')
message = await fetch(message_id)
else:
raise RuntimeError("channel doesn't support fetch_message")
except Exception:
not_found.append(src_url)
continue
# Find updated attachment by filename
new_url = None
for att in message.attachments:
if att.filename == filename:
new_url = att.url
break
if not new_url:
not_found.append(src_url)
continue
# Update DB with new URL and remove failure
async with pool.acquire() as conn:
await conn.execute("UPDATE attachments SET url = $1 WHERE attachment_id = $2", new_url, attachment_id)
await conn.execute("DELETE FROM video_download_failures WHERE source_url = $1", src_url)
updated += 1
resp = f"Scanned {found} failures; updated {updated} attachment URLs; {len(not_found)} not updated."
if not_found:
resp += "\nNot updated examples: " + ", ".join(not_found[:3])
await ctx.respond(resp, ephemeral=True)
except Exception as e:
logging.error(f"Error in video_retry_fetch: {e}")
traceback.print_exc()
await ctx.respond(f"Error: {str(e)}", ephemeral=True)
@bridge.bridge_command()
@commands.is_owner()
async def video_backfill(self, ctx, batch_size: int = 10, max_batches: int = 0) -> None:
"""Trigger a video backfill run. Set max_batches=0 (default) for unlimited.
Usage:
.video_backfill (unlimited with batch_size=10)
.video_backfill 25 (unlimited with batch_size=25)
.video_backfill 10 100 (batch_size=10, max_batches=100)
"""
try:
logger_cog = self._get_message_logger()
if not logger_cog or not logger_cog.db or not logger_cog.db.pool:
return await ctx.respond("Database not available.", ephemeral=True)
# Check counts first
counts = await logger_cog.db.get_uncached_video_counts()
msg_pending = counts.get('messages_with_uncached_videos', 0)
att_pending = counts.get('attachments_missing_video_cache', 0)
emb_pending = counts.get('embeds_missing_video_cache', 0)
if msg_pending == 0:
return await ctx.respond(f"No videos need caching. (Cached: {counts.get('total_cached_videos', 0)}, Failed: {counts.get('failed_video_downloads', 0)})", ephemeral=True)
# If max_batches=0, pass None to mean unlimited batches
cap = None if max_batches == 0 else max_batches
await ctx.respond(f"Starting video backfill (batch_size={batch_size}, max_batches={'unlimited' if cap is None else max_batches})...\nMessages with uncached videos: {msg_pending} ({att_pending} attachments, {emb_pending} embeds)", ephemeral=True)
stats = await logger_cog.db.backfill_missing_videos(batch_size=batch_size, max_batches=cap)
embed = discord.Embed(title="Video Backfill Complete", color=discord.Color.green())
embed.add_field(name="Attachments Processed", value=str(stats.get('attachments_processed', 0)), inline=True)
embed.add_field(name="Attachments Cached", value=str(stats.get('attachments_cached', 0)), inline=True)
embed.add_field(name="Embeds Processed", value=str(stats.get('embeds_processed', 0)), inline=True)
embed.add_field(name="Embeds Cached", value=str(stats.get('embeds_cached', 0)), inline=True)
await ctx.respond(embed=embed, ephemeral=True)
except Exception as e:
logging.error(f"Error in video_backfill: {e}")
traceback.print_exc()
await ctx.respond(f"Error: {str(e)}", ephemeral=True)
def setup(bot) -> None:
"""Run on Cog Load"""
bot.add_cog(Owner(bot))