347 lines
		
	
	
		
			13 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			347 lines
		
	
	
		
			13 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| import io
 | |
| import random
 | |
| import asyncio
 | |
| import logging
 | |
| import traceback
 | |
| from typing import Optional
 | |
| import discord
 | |
| import requests
 | |
| from discord.ext import bridge, commands
 | |
| from disc_havoc import Havoc
 | |
| import util
 | |
| 
 | |
| 
 | |
| class Owner(commands.Cog):
 | |
|     """Owner Cog for Havoc"""
 | |
| 
 | |
|     def __init__(self, bot: Havoc) -> None:
 | |
|         self.bot: Havoc = bot
 | |
|         self.former_roles_store: dict[int, list[discord.Role]] = {}
 | |
|         self._temperature: int = random.randrange(20, 30)
 | |
| 
 | |
|     @bridge.bridge_command(guild_ids=[1145182936002482196])
 | |
|     async def temperature(self, ctx, temp: Optional[int | str] = None) -> None:
 | |
|         """
 | |
|         Set Temperature
 | |
|         """
 | |
|         if not temp:
 | |
|             return await ctx.respond(
 | |
|                 f"The current temperature is: {self._temperature} °C"
 | |
|             )
 | |
| 
 | |
|         if not self.bot.is_owner(ctx.author):
 | |
|             return await ctx.respond("I am afraid I can't let you do that.")
 | |
|         try:
 | |
|             _temperature: int = int(temp)
 | |
|         except Exception as e:
 | |
|             logging.debug("Exception: %s", str(e))
 | |
|             return await ctx.respond("Invalid input")
 | |
|         if _temperature < -15:
 | |
|             return await ctx.respond("Too cold! (-15°C minimum)")
 | |
|         elif _temperature > 35:
 | |
|             return await ctx.respond("Too hot! (35°C maximum)")
 | |
|         self._temperature = _temperature
 | |
|         return await ctx.respond(
 | |
|             f"As per your request, I have adjusted the temperature to {_temperature} °C."
 | |
|         )
 | |
| 
 | |
|     @bridge.bridge_command()
 | |
|     @commands.is_owner()
 | |
|     async def editmsg(self, ctx, msgid: str, *, newcontent: str) -> None:
 | |
|         """
 | |
|         Edit a message previously sent by the bot
 | |
|         """
 | |
| 
 | |
|         try:
 | |
|             message: Optional[discord.Message] = self.bot.get_message(int(msgid))
 | |
|             if not message:
 | |
|                 await ctx.respond(
 | |
|                     f"**Failed:** Message {msgid} not found.", ephemeral=True
 | |
|                 )
 | |
|                 return None
 | |
|             await message.edit(content=newcontent)
 | |
|             await ctx.respond("**Done!**", ephemeral=True)
 | |
|         except Exception as e:
 | |
|             await ctx.respond(f"**Failed:** {str(e)}", ephemeral=True)
 | |
| 
 | |
|     @bridge.bridge_command()
 | |
|     @commands.is_owner()
 | |
|     async def reload(self, ctx) -> None:
 | |
|         """
 | |
|         Reload Cogs
 | |
|         """
 | |
| 
 | |
|         self.bot.load_exts(False)
 | |
|         await ctx.respond("Reloaded!", ephemeral=True)
 | |
| 
 | |
|     @bridge.bridge_command()
 | |
|     @commands.is_owner()
 | |
|     async def say(self, ctx, *, parameters: str) -> None:
 | |
|         """
 | |
|         Make me say something in a channel
 | |
|         """
 | |
|         _parameters: list[str] = parameters.split(" ")
 | |
| 
 | |
|         if not len(_parameters) > 1:
 | |
|             return await ctx.respond(
 | |
|                 "**Error**: Incorrect command usage; required: <chan> <msg>",
 | |
|                 ephemeral=True,
 | |
|             )
 | |
| 
 | |
|         channel: str = _parameters[0]
 | |
|         channel_mentions: list[int] = discord.utils.raw_channel_mentions(channel)
 | |
|         if channel_mentions:
 | |
|             channel = str(channel_mentions[0])
 | |
|         msg: str = " ".join(_parameters[1:])
 | |
|         await util.discord_helpers.send_message(self.bot, channel=channel, message=msg)
 | |
|         return await ctx.respond("**Done.**", ephemeral=True)
 | |
| 
 | |
|     @bridge.bridge_command()
 | |
|     @commands.is_owner()
 | |
|     async def chgstatus(self, ctx, *, status: Optional[str] = None) -> None:
 | |
|         """
 | |
|         Change bots status
 | |
|         """
 | |
|         if not status:
 | |
|             return await ctx.respond(
 | |
|                 "ERR: No status provided to change to!", ephemeral=True
 | |
|             )
 | |
| 
 | |
|         await self.bot.change_presence(
 | |
|             status=discord.Status.online,
 | |
|             activity=discord.CustomActivity(name=status.strip()),
 | |
|         )
 | |
|         await ctx.respond("Done!", ephemeral=True)
 | |
| 
 | |
|     @commands.message_command(name="Remove Messages Starting Here")
 | |
|     @commands.is_owner()
 | |
|     async def purge(self, ctx, message: discord.Message) -> None:
 | |
|         """
 | |
|         Purge Messages
 | |
| 
 | |
|         Args:
 | |
|             ctx (Any): Discord context
 | |
|             message (discord.Message): Discord message
 | |
|         Returns:
 | |
|             None
 | |
|         """
 | |
|         try:
 | |
|             await ctx.channel.purge(
 | |
|                 after=message,
 | |
|                 bulk=True,
 | |
|                 limit=900000,
 | |
|                 reason=f"Purge initiated by {ctx.author.display_name}",
 | |
|             )
 | |
|             await message.delete(reason=f"Purge initiated by {ctx.author.display_name}")
 | |
|             await ctx.respond("**Done!**")
 | |
|             # Wait 3 seconds, then delete interaction
 | |
|             await asyncio.sleep(3)
 | |
|             interaction = await ctx.interaction.original_response()
 | |
|             await interaction.delete()
 | |
|         except Exception as e:
 | |
|             traceback.print_exc()
 | |
|             return await ctx.respond(f"**ERR: {str(e)}**")
 | |
| 
 | |
|     @commands.message_command(name="Move to Memes")
 | |
|     @commands.is_owner()
 | |
|     async def movememe(self, ctx, message: discord.Message) -> None:
 | |
|         """
 | |
|         Move to Memes
 | |
| 
 | |
|         Args:
 | |
|             ctx (Any): Discord context
 | |
|             message (discord.Message): Discord message
 | |
|         Returns:
 | |
|             None
 | |
|         """
 | |
|         try:
 | |
|             if not isinstance(message.channel, discord.TextChannel):
 | |
|                 return
 | |
|             memes_channel: discord.TextChannel = ctx.guild.get_channel(
 | |
|                 1147229098544988261
 | |
|             )
 | |
|             message_content: str = message.content
 | |
|             message_author: str = message.author.display_name
 | |
|             message_channel: str = message.channel.name
 | |
|             _file: Optional[discord.File] = None
 | |
|             if message.attachments:
 | |
|                 for item in message.attachments:
 | |
|                     if item.url and len(item.url) >= 20:
 | |
|                         image: io.BytesIO = io.BytesIO(
 | |
|                             requests.get(item.url, stream=True, timeout=20).raw.read()
 | |
|                         )
 | |
|                         ext: str = item.url.split(".")[-1].split("?")[0].split("&")[0]
 | |
|                         _file = discord.File(image, filename=f"img.{ext}")
 | |
|             if not _file:
 | |
|                 return  # No file to move
 | |
|             await memes_channel.send(
 | |
|                 f"*Performing bureaucratic duties (this didn't belong in #{message_channel})...*\n**{message_author}:** {message_content}",
 | |
|                 file=_file,
 | |
|             )
 | |
|             await message.delete()
 | |
|             await ctx.respond("OK!", ephemeral=True)
 | |
|         except Exception as e:
 | |
|             logging.debug("Exception: %s", str(e))
 | |
|             traceback.print_exc()
 | |
|             return await ctx.respond("Failed! :(", ephemeral=True)
 | |
| 
 | |
|     @commands.message_command(name="Move to Drugs")
 | |
|     @commands.is_owner()
 | |
|     async def movedrugs(self, ctx, message: discord.Message) -> None:
 | |
|         """
 | |
|         Move to Drugs
 | |
| 
 | |
|         Args:
 | |
|             ctx (Any): Discord context
 | |
|             message (discord.Message): Discord message
 | |
|         Returns:
 | |
|             None
 | |
|         """
 | |
|         try:
 | |
|             if not isinstance(message.channel, discord.TextChannel):
 | |
|                 return
 | |
|             drugs_channel: discord.TextChannel = ctx.guild.get_channel(
 | |
|                 1172247451047034910
 | |
|             )
 | |
|             message_content: str = message.content
 | |
|             message_author: str = message.author.display_name
 | |
|             message_channel: str = message.channel.name
 | |
|             _file: Optional[discord.File] = None
 | |
|             if message.attachments:
 | |
|                 for item in message.attachments:
 | |
|                     if item.url and len(item.url) >= 20:
 | |
|                         image: io.BytesIO = io.BytesIO(
 | |
|                             requests.get(item.url, stream=True, timeout=20).raw.read()
 | |
|                         )
 | |
|                         ext: str = item.url.split(".")[-1].split("?")[0].split("&")[0]
 | |
|                         _file = discord.File(image, filename=f"img.{ext}")
 | |
|             if not _file:
 | |
|                 return  # No file to move
 | |
|             await drugs_channel.send(
 | |
|                 f"*Performing bureaucratic duties (this didn't belong in #{message_channel})...\
 | |
|                 *\n**{message_author}:** {message_content}",
 | |
|                 file=_file,
 | |
|             )
 | |
|             await message.delete()
 | |
|             await ctx.respond("OK!", ephemeral=True)
 | |
|         except Exception as e:
 | |
|             logging.debug("Exception: %s", str(e))
 | |
|             traceback.print_exc()
 | |
|             return await ctx.respond("Failed! :(", ephemeral=True)
 | |
| 
 | |
|     @commands.message_command(name="Move to fun-house")
 | |
|     @commands.is_owner()
 | |
|     async def movefunhouse(self, ctx, message: discord.Message) -> None:
 | |
|         """
 | |
|         Move to fun-house
 | |
| 
 | |
|         Args:
 | |
|             ctx (Any): Discord context
 | |
|             message (discord.Message): Discord message
 | |
|         Returns:
 | |
|             None
 | |
|         """
 | |
|         try:
 | |
|             if not isinstance(message.channel, discord.TextChannel):
 | |
|                 return
 | |
|             funhouse_channel: discord.TextChannel = ctx.guild.get_channel(
 | |
|                 1213160512364478607
 | |
|             )
 | |
|             message_content: str = message.content
 | |
|             message_author: str = message.author.display_name
 | |
|             message_channel: str = message.channel.name
 | |
|             _file: Optional[discord.File] = None
 | |
|             if message.attachments:
 | |
|                 for item in message.attachments:
 | |
|                     if item.url and len(item.url) >= 20:
 | |
|                         image: io.BytesIO = io.BytesIO(
 | |
|                             requests.get(item.url, stream=True, timeout=20).raw.read()
 | |
|                         )
 | |
|                         ext: str = item.url.split(".")[-1].split("?")[0].split("&")[0]
 | |
|                         _file = discord.File(image, filename=f"img.{ext}")
 | |
|             await funhouse_channel.send(
 | |
|                 f"*Performing bureaucratic duties (this didn't belong in #{message_channel})\
 | |
|                 ...*\n**{message_author}:** {message_content}"
 | |
|             )
 | |
|             await message.delete()
 | |
|             await ctx.respond("OK!", ephemeral=True)
 | |
|         except Exception as e:
 | |
|             logging.debug("Exception: %s", str(e))
 | |
|             traceback.print_exc()
 | |
|             return await ctx.respond("Failed! :(", ephemeral=True)
 | |
| 
 | |
|     @commands.user_command(name="Einsperren!", guild_ids=[145182936002482196])
 | |
|     @commands.is_owner()
 | |
|     async def einsperren(self, ctx, member: discord.Member) -> None:
 | |
|         """
 | |
|         Einsperren!
 | |
| 
 | |
|         Args:
 | |
|             ctx (Any): Discord context
 | |
|             member (discord.Member): Discord member
 | |
|         Returns:
 | |
|             None
 | |
|         """
 | |
|         try:
 | |
|             if not ctx.guild.id == 1145182936002482196:
 | |
|                 return  # Not home server!
 | |
|             if not member.roles:
 | |
|                 return  # No roles
 | |
|             audit_reason: str = f"Einsperren von {ctx.user.display_name}"
 | |
|             member = ctx.guild.get_member(member.id)
 | |
|             member_display: str = member.display_name
 | |
|             einsperren_role: discord.Role = (
 | |
|                 ctx.guild.get_role(1235415059300093973)
 | |
|                 if ctx.guild.id != 1145182936002482196
 | |
|                 else ctx.guild.get_role(1235406301614309386)
 | |
|             )
 | |
|             member_roles: list = [
 | |
|                 role for role in member.roles if not role.name == "@everyone"
 | |
|             ]
 | |
|             member_role_names: list[str] = [
 | |
|                 str(role.name).lower() for role in member_roles
 | |
|             ]
 | |
|             opers_chan: discord.TextChannel = ctx.guild.get_channel(1181416083287187546)
 | |
|             if "einsperren" not in member_role_names:
 | |
|                 try:
 | |
|                     if member.id in self.former_roles_store:
 | |
|                         self.former_roles_store.pop(member.id)
 | |
|                     self.former_roles_store[member.id] = member.roles
 | |
|                 except:  # noqa
 | |
|                     """Safe to ignore"""
 | |
|                     pass
 | |
|                 try:
 | |
|                     await member.edit(roles=[einsperren_role], reason=audit_reason)
 | |
|                     await ctx.respond(
 | |
|                         f"Gesendet {member_display} an einsperren.", ephemeral=True
 | |
|                     )
 | |
|                     await opers_chan.send(
 | |
|                         f"@everyone: {ctx.user.display_name} gesendet {member_display} an einsperren."
 | |
|                     )
 | |
|                 except Exception as e:
 | |
|                     logging.debug("Exception: %s", str(e))
 | |
|                     traceback.print_exc()
 | |
|                     return await ctx.respond("GOTTVERDAMMT!!", ephemeral=True)
 | |
|                 self.former_roles_store[member.id] = member.roles
 | |
| 
 | |
|             if member.id not in self.former_roles_store:
 | |
|                 await member.edit(roles=[])  # No roles
 | |
|             else:
 | |
|                 former_roles: list = self.former_roles_store.get(member.id, [0])
 | |
|                 await member.edit(roles=former_roles, reason=f"De-{audit_reason}")
 | |
| 
 | |
|             await ctx.respond(
 | |
|                 f"{member_display} wurde von der Einsperre befreit.", ephemeral=True
 | |
|             )
 | |
|             await opers_chan.send(
 | |
|                 f"{member_display} wurde von {ctx.user.display_name} aus der Einsperre befreit."
 | |
|             )
 | |
|         except Exception as e:
 | |
|             traceback.print_exc()
 | |
|             return await ctx.respond(f"ERR: {str(e)}", ephemeral=True)
 | |
| 
 | |
| 
 | |
| def setup(bot) -> None:
 | |
|     """Run on Cog Load"""
 | |
|     bot.add_cog(Owner(bot))
 |