import os import traceback import json import io import asyncio import random from typing import LiteralString, Optional, Any import logging import textwrap import regex import requests import discord from disc_havoc import Havoc from aiohttp import ClientSession from discord.ext import bridge, commands, tasks from util.jesusmemes import JesusMemeGenerator import scrapers.reddit_scrape as memeg import scrapers.explosm_scrape as explosmg import scrapers.xkcd_scrape as xkcdg import scrapers.smbc_scrape as smbcg import scrapers.qc_scrape as qcg import scrapers.dinosaur_scrape as dinog import scrapers.onion_scrape as oniong import scrapers.thn_scrape as thng import constants meme_choices = [] BOT_CHANIDS = [] class Helper: """Meme Helper""" def load_meme_choices(self) -> None: """Load Available Meme Templates from JSON File""" global meme_choices memes_file: str|LiteralString = os.path.join(os.path.dirname(__file__), "memes.json") with open(memes_file, 'r', encoding='utf-8') as f: meme_choices = json.loads(f.read()) class MemeView(discord.ui.View): """Meme Selection discord.ui.View""" helper = Helper() helper.load_meme_choices() @discord.ui.select( placeholder = "Choose a Meme!", min_values = 1, max_values = 1, options = [ discord.SelectOption( label=meme_label ) for meme_label in meme_choices[0:24] ] ) async def select_callback(self, select: discord.ui.Select, interaction: discord.Interaction) -> None: """Meme Selection Callback""" if not isinstance(select.values[0], str): return modal: discord.ui.Modal = MemeModal(meme=select.values[0], title="Meme Selected") await interaction.response.send_modal(modal) class MemeModal(discord.ui.Modal): """Meme Creation discord.ui.Modal""" def __init__(self, *args, meme: Optional[str] = None, **kwargs) -> None: super().__init__(*args, **kwargs) self.selected_meme: Optional[str] = meme self.meme_generator = JesusMemeGenerator() self.TEXT_LIMIT: int = 80 self.add_item(discord.ui.InputText(label="Top Text", style=discord.InputTextStyle.singleline)) self.add_item(discord.ui.InputText(label="Bottom Text", style=discord.InputTextStyle.singleline)) async def callback(self, interaction: discord.Interaction) -> None: if not self.selected_meme: # No meme selected return selected_meme: str = self.selected_meme if not self.children or len(self.children) < 2: # Invalid request return if not isinstance(self.children[0].value, str)\ or not isinstance(self.children[1].value, str): # Invalid request return meme_top_line: str = self.children[0].value.strip() meme_bottom_line: str = self.children[1].value.strip() if len(meme_top_line) > self.TEXT_LIMIT or len(meme_bottom_line) > self.TEXT_LIMIT: await interaction.response.send_message("ERR: Text is limited to 80 characters for each the top and bottom lines.") return meme_link: Optional[str] = await self.meme_generator.create_meme(top_line=meme_top_line, bottom_line=meme_bottom_line, meme=selected_meme) if not meme_link: await interaction.response.send_message("Failed!") return embed: discord.Embed = discord.Embed(title="Generated Meme") embed.set_image(url=meme_link) embed.add_field(name="Meme", value=selected_meme, inline=True) await interaction.response.send_message(embeds=[embed]) return class Meme(commands.Cog): """Meme Cog for Havoc""" def __init__(self, bot: Havoc) -> None: self.bot: Havoc = bot self.meme_choices: list = [] self.meme_counter: int = 0 self.THREADS: dict[str, dict[int, list]] = { # Format: Guild1: [ChanId : [Webhook, ThreadId], Guild2: [ChanId : [Webhook, ThreadId] 'comic_explosm': { 1298729744216359055: [constants.EXPLOSM_WEBHOOK, 1299165855493390367], 1306414795049926676: [constants.EXPLOSM_WEBHOOK2, 1306416492304138364], }, 'comic_xkcd': { 1298729744216359055: [constants.XKCD_WEBHOOK, 1299165928755433483], 1306414795049926676: [constants.XKCD_WEBHOOK2, 1306416681991798854], }, 'comic_smbc': { 1298729744216359055: [constants.SMBC_WEBHOOK, 1299166071038808104], 1306414795049926676: [constants.SMBC_WEBHOOK2, 1306416842511745024], }, 'comic_qc': { 1298729744216359055: [constants.QC_WEBHOOK, 1299392115364593674], 1306414795049926676: [constants.QC_WEBHOOK2, 1306417084774744114], }, 'comic_dino': { 1298729744216359055: [constants.DINO_WEBHOOK, 1299771918886506557], 1306414795049926676: [constants.DINO_WEBHOOK2, 1306417286713704548], } } self.NO_THREAD_WEBHOOKS: dict[str, list] = { 'theonion': [constants.ONION_WEBHOOK, constants.ONION_WEBHOOK2], 'thn': [constants.THN_WEBHOOK], 'memes': [constants.MEME_WEBHOOK1, constants.MEME_WEBHOOK2], } global BOT_CHANIDS BOT_CHANIDS = self.bot.BOT_CHANIDS # Inherit self.meme_stream_loop.start() self.explosm_loop.start() def is_spamchan() -> bool: # type: ignore """Check if channel is spamchan""" def predicate(ctx): try: if not ctx.channel.id in BOT_CHANIDS: logging.debug("%s not found in %s", ctx.channel.id, BOT_CHANIDS) return ctx.channel.id in BOT_CHANIDS except: traceback.print_exc() return False return commands.check(predicate) # type: ignore async def do_autos(self, only_comics: Optional[bool] = False) -> None: """ Run Auto Posters Args: only_comics (Optional[bool]): default False Returns: None """ try: meme_grabber = memeg.MemeGrabber() explosm_grabber = explosmg.ExplosmGrabber() xkcd_grabber = xkcdg.XKCDGrabber() smbc_grabber = smbcg.SMBCGrabber() qc_grabber = qcg.QCGrabber() dino_grabber = dinog.DinosaurGrabber() onion_grabber = oniong.OnionGrabber() thn_grabber = thng.THNGrabber() explosm_comics: list[Optional[tuple]] = [] xkcd_comics: list[Optional[tuple]] = [] smbc_comics: list[Optional[tuple]] = [] dino_comics: list[Optional[tuple]] = [] onions: list[Optional[tuple]] = [] thns: list[Optional[tuple]] = [] memes: list[Optional[tuple]] = await meme_grabber.get() try: try: explosm_comics = await explosm_grabber.get() except: pass try: xkcd_comics = await xkcd_grabber.get() except: pass try: smbc_comics = await smbc_grabber.get() except: pass try: qc_comics = await qc_grabber.get() print(f"QC: {qc_comics}") except: pass try: dino_comics = await dino_grabber.get() except Exception as e: logging.debug("Dino failed: %s", str(e)) pass try: onions = await onion_grabber.get() except Exception as e: logging.debug("Onion failed: %s", str(e)) pass try: thns = await thn_grabber.get() except Exception as e: logging.debug("THNs failed: %s", str(e)) pass except: traceback.print_exc() agents: list[str] = constants.HTTP_UA_LIST headers: dict = { 'User-Agent': random.choice(agents) } if not only_comics: try: for meme in memes: if not meme: continue (meme_id, meme_title, meme_url) = meme request = requests.get(meme_url, stream=True, timeout=(5, 30), headers=headers) if not request.status_code == 200: continue meme_content: bytes = request.raw.read() for meme_hook in self.NO_THREAD_WEBHOOKS.get('memes', {}): meme_image: io.BytesIO = io.BytesIO(meme_content) ext: str = meme_url.split(".")[-1]\ .split("?")[0].split("&")[0] async with ClientSession() as session: webhook: discord.Webhook = discord.Webhook.from_url(meme_hook, session=session) await webhook.send(file=discord.File(meme_image, filename=f'img.{ext}'), username="r/memes") await asyncio.sleep(2) except: pass try: for comic in explosm_comics: if not comic: continue (comic_title, comic_url) = comic comic_title = discord.utils.escape_markdown(comic_title) comic_request = requests.get(comic_url, stream=True, timeout=(5, 20), headers=headers) comic_request.raise_for_status() comic_content: bytes = comic_request.raw.read() ext = comic_url.split(".")[-1]\ .split("?")[0].split("&")[0] async with ClientSession() as session: for chanid, _hook in self.THREADS.get('comic_explosm', {}).items(): comic_image: io.BytesIO = io.BytesIO(comic_content) channel: int = chanid (hook_uri, thread_id) = _hook webhook = discord.Webhook.from_url(hook_uri, session=session) _channel: Any = self.bot.get_channel(channel) if not _channel: return thread = _channel.get_thread(thread_id) await webhook.send(f"**{comic_title}**", file=discord.File(comic_image, filename=f'img.{ext}'), username="Cyanide & Happiness", thread=thread) await asyncio.sleep(2) except: pass try: for comic in xkcd_comics: if not comic: continue (comic_title, comic_url) = comic comic_title = discord.utils.escape_markdown(comic_title) comic_request = requests.get(comic_url, stream=True, timeout=(5, 20), headers=headers) comic_request.raise_for_status() comic_content = comic_request.raw.read() comic_image = io.BytesIO(comic_request.raw.read()) ext = comic_url.split(".")[-1]\ .split("?")[0].split("&")[0] async with ClientSession() as session: for chanid, _hook in self.THREADS.get('comic_xkcd', {}).items(): comic_image = io.BytesIO(comic_content) channel = chanid (hook_uri, thread_id) = _hook webhook = discord.Webhook.from_url(hook_uri, session=session) _channel = self.bot.get_channel(channel) if not _channel: return thread = _channel.get_thread(thread_id) await webhook.send(f"**{comic_title}**", file=discord.File(comic_image, filename=f'img.{ext}'), username="xkcd", thread=thread) await asyncio.sleep(2) except: pass try: for comic in smbc_comics: if not comic: continue (comic_title, comic_url) = comic comic_title = discord.utils.escape_markdown(comic_title) comic_request = requests.get(comic_url, stream=True, timeout=(5, 20), headers=headers) comic_request.raise_for_status() comic_content = comic_request.raw.read() ext = comic_url.split(".")[-1]\ .split("?")[0].split("&")[0] async with ClientSession() as session: for chanid, _hook in self.THREADS.get('comic_smbc', {}).items(): comic_image = io.BytesIO(comic_content) channel = chanid (hook_uri, thread_id) = _hook webhook = discord.Webhook.from_url(hook_uri, session=session) _channel = self.bot.get_channel(channel) if not _channel: return thread = _channel.get_thread(thread_id) await webhook.send(f"**{comic_title}**", file=discord.File(comic_image, filename=f'img.{ext}'), username="SMBC", thread=thread) await asyncio.sleep(2) except: pass try: for comic in qc_comics: logging.debug("Trying QC...") if not comic: continue (comic_title, comic_url) = comic comic_title = discord.utils.escape_markdown(comic_title) comic_url = regex.sub(r'^http://ww\.', 'http://www.', comic_url) comic_url = regex.sub(r'\.pmg$', '.png', comic_url) comic_request = requests.get(comic_url, stream=True, timeout=(5, 20), headers=headers) comic_request.raise_for_status() comic_content = comic_request.raw.read() ext = comic_url.split(".")[-1]\ .split("?")[0].split("&")[0] async with ClientSession() as session: for chanid, _hook in self.THREADS.get('comic_qc', {}).items(): comic_image = io.BytesIO(comic_content) channel = chanid (hook_uri, thread_id) = _hook webhook = discord.Webhook.from_url(hook_uri, session=session) _channel = self.bot.get_channel(channel) if not _channel: return thread = _channel.get_thread(thread_id) await webhook.send(f"**{comic_title}**", file=discord.File(comic_image, filename=f'img.{ext}'), username="Questionable Content", thread=thread) await asyncio.sleep(2) except: traceback.print_exc() pass try: for comic in dino_comics: if not comic: continue (comic_title, comic_url) = comic comic_title = discord.utils.escape_markdown(comic_title) comic_request = requests.get(comic_url, stream=True, timeout=(5, 20), headers=headers) comic_request.raise_for_status() comic_content = comic_request.raw.read() ext = comic_url.split(".")[-1]\ .split("?")[0].split("&")[0] async with ClientSession() as session: for chanid, _hook in self.THREADS.get('comic_dino', {}).items(): comic_image = io.BytesIO(comic_content) channel = chanid (hook_uri, thread_id) = _hook webhook = discord.Webhook.from_url(hook_uri, session=session) _channel = self.bot.get_channel(channel) if not _channel: return thread = _channel.get_thread(thread_id) await webhook.send(f"**{comic_title}**", file=discord.File(comic_image, filename=f'img.{ext}'), username="Dinosaur Comics", thread=thread) await asyncio.sleep(2) except: pass try: for onion in onions: if not onion: continue (onion_title, onion_description, onion_link, onion_video) = onion onion_description = textwrap.wrap(text=onion_description, width=860, max_lines=1)[0] embed: discord.Embed = discord.Embed(title=onion_title) embed.add_field(name="Content", value=f"{onion_description[0:960]}\n-# {onion_link}") async with ClientSession() as session: for hook in self.NO_THREAD_WEBHOOKS.get('theonion', {}): hook_uri = hook webhook = discord.Webhook.from_url(hook_uri, session=session) await webhook.send(embed=embed, username="The Onion") if onion_video: await webhook.send(f"^ video: {onion_video}") await asyncio.sleep(2) except: pass try: for thn in thns: logging.debug("Trying thn...") if not thn: continue (thn_title, thn_description, thn_link, thn_pubdate, thn_video) = thn thn_description = textwrap.wrap(text=thn_description, width=860, max_lines=1)[0] embed = discord.Embed(title=thn_title) embed.add_field(name="Content", value=f"{thn_description[0:960]}\n-# {thn_link}") embed.add_field(name="Published", value=thn_pubdate, inline=False) async with ClientSession() as session: for hook in self.NO_THREAD_WEBHOOKS.get('thn', {}): hook_uri = hook webhook = discord.Webhook.from_url(hook_uri, session=session) await webhook.send(embed=embed, username="The Hacker News") if thn_video: await webhook.send(f"^ video: {thn_video}") await asyncio.sleep(2) except: pass except: # await self.bot.get_channel(self.MEMESTREAM_CHANID).send(f"FUCK, MY MEEMER! YOU DENTED MY MEEMER!") traceback.print_exc() @tasks.loop(hours=12.0) async def meme_stream_loop(self) -> None: """Meme Stream Loop (r/memes)""" try: await asyncio.sleep(10) # Try to ensure we are ready first self.meme_counter += 1 if self.meme_counter == 1: return await self.do_autos(only_comics=True) # Skip first iteration! await self.do_autos() except: traceback.print_exc() @tasks.loop(hours=0.5) async def explosm_loop(self) -> None: """Comic Loop""" try: await asyncio.sleep(10) # Try to ensure we are ready first await self.do_autos(only_comics=True) except: traceback.print_exc() @bridge.bridge_command() # type: ignore @is_spamchan() async def meme(self, ctx) -> None: """Create Meme""" await ctx.respond(view=MemeView()) @bridge.bridge_command(hidden=True) @commands.is_owner() async def domemestream(self, ctx) -> None: """Run Meme Stream Auto Post""" try: await ctx.respond("Trying!", ephemeral=True) await self.do_autos() except: await ctx.respond("Fuck! :(", ephemeral=True) traceback.print_exc() @bridge.bridge_command(hidden=True) @commands.is_owner() async def doexplosm(self, ctx) -> None: """Run Comic Auto Posters""" try: await ctx.respond("Trying!", ephemeral=True) await self.do_autos(only_comics=True) except: await ctx.respond("Fuck! :(", ephemeral=True) traceback.print_exc() def cog_unload(self) -> None: self.meme_stream_loop.cancel() self.explosm_loop.cancel() def setup(bot) -> None: """Run on Cog Load""" bot.add_cog(Meme(bot))