#!/usr/bin/env python3.12 import os import traceback import json import io import asyncio import random from typing import LiteralString, Optional, Any import logging import textwrap import regex import requests import discord from disc_havoc import Havoc from aiohttp import ClientSession from discord.ext import bridge, commands, tasks from jesusmemes import JesusMemeGenerator import scrapers.reddit_scrape as memeg import scrapers.explosm_scrape as explosmg import scrapers.xkcd_scrape as xkcdg import scrapers.smbc_scrape as smbcg import scrapers.qc_scrape as qcg import scrapers.dinosaur_scrape as dinog import scrapers.onion_scrape as oniong import scrapers.thn_scrape as thng import constants # pylint: disable=global-statement, bare-except, invalid-name, line-too-long meme_choices = [] BOT_CHANIDS = [] class Helper: """Meme Helper""" def load_meme_choices(self) -> None: """Load Available Meme Templates from JSON File""" global meme_choices memes_file: str|LiteralString = os.path.join(os.path.dirname(__file__), "memes.json") with open(memes_file, 'r', encoding='utf-8') as f: meme_choices = json.loads(f.read()) class MemeView(discord.ui.View): """Meme Selection discord.ui.View""" helper = Helper() helper.load_meme_choices() @discord.ui.select( placeholder = "Choose a Meme!", min_values = 1, max_values = 1, options = [ discord.SelectOption( label=meme_label ) for meme_label in meme_choices[0:24] ] ) async def select_callback(self, select: discord.ui.Select, interaction: discord.Interaction) -> None: """Meme Selection Callback""" if not isinstance(select.values[0], str): return modal: discord.ui.Modal = MemeModal(meme=select.values[0], title="Meme Selected") await interaction.response.send_modal(modal) class MemeModal(discord.ui.Modal): """Meme Creation discord.ui.Modal""" def __init__(self, *args, meme: Optional[str] = None, **kwargs) -> None: super().__init__(*args, **kwargs) self.selected_meme: Optional[str] = meme self.meme_generator = JesusMemeGenerator() self.TEXT_LIMIT: int = 80 self.add_item(discord.ui.InputText(label="Top Text", style=discord.InputTextStyle.singleline)) self.add_item(discord.ui.InputText(label="Bottom Text", style=discord.InputTextStyle.singleline)) async def callback(self, interaction: discord.Interaction) -> None: if not self.selected_meme: # No meme selected return selected_meme: str = self.selected_meme if not self.children or len(self.children) < 2: # Invalid request return if not isinstance(self.children[0].value, str)\ or not isinstance(self.children[1].value, str): # Invalid request return meme_top_line: str = self.children[0].value.strip() meme_bottom_line: str = self.children[1].value.strip() if len(meme_top_line) > self.TEXT_LIMIT or len(meme_bottom_line) > self.TEXT_LIMIT: await interaction.response.send_message("ERR: Text is limited to 80 characters for each the top and bottom lines.") return meme_link: str = await self.meme_generator.create_meme(top_line=meme_top_line, bottom_line=meme_bottom_line, meme=selected_meme) embed: discord.Embed = discord.Embed(title="Generated Meme") embed.set_image(url=meme_link) embed.add_field(name="Meme", value=selected_meme, inline=True) await interaction.response.send_message(embeds=[embed]) return class Meme(commands.Cog): """Meme Cog for Havoc""" def __init__(self, bot: Havoc) -> None: self.bot: Havoc = bot self.meme_choices: list = [] self.meme_counter: int = 0 self.THREADS: dict[str, dict[int, list]] = { # Format: Guild1: [ChanId : [Webhook, ThreadId], Guild2: [ChanId : [Webhook, ThreadId] 'comic_explosm': { 1298729744216359055: [constants.EXPLOSM_WEBHOOK, 1299165855493390367], 1306414795049926676: [constants.EXPLOSM_WEBHOOK2, 1306416492304138364], }, 'comic_xkcd': { 1298729744216359055: [constants.XKCD_WEBHOOK, 1299165928755433483], 1306414795049926676: [constants.XKCD_WEBHOOK2, 1306416681991798854], }, 'comic_smbc': { 1298729744216359055: [constants.SMBC_WEBHOOK, 1299166071038808104], 1306414795049926676: [constants.SMBC_WEBHOOK2, 1306416842511745024], }, 'comic_qc': { 1298729744216359055: [constants.QC_WEBHOOK, 1299392115364593674], 1306414795049926676: [constants.QC_WEBHOOK2, 1306417084774744114], }, 'comic_dino': { 1298729744216359055: [constants.DINO_WEBHOOK, 1299771918886506557], 1306414795049926676: [constants.DINO_WEBHOOK2, 1306417286713704548], } } self.NO_THREAD_WEBHOOKS: dict[str, list] = { 'theonion': [constants.ONION_WEBHOOK, constants.ONION_WEBHOOK2], 'thn': [constants.THN_WEBHOOK], 'memes': [constants.MEME_WEBHOOK1, constants.MEME_WEBHOOK2], } global BOT_CHANIDS BOT_CHANIDS = self.bot.BOT_CHANIDS # Inherit self.meme_stream_loop.start() self.explosm_loop.start() def is_spamchan() -> bool: # type: ignore """Check if channel is spamchan""" def predicate(ctx): try: if not ctx.channel.id in BOT_CHANIDS: logging.debug("%s not found in %s", ctx.channel.id, BOT_CHANIDS) return ctx.channel.id in BOT_CHANIDS except: traceback.print_exc() return False return commands.check(predicate) # type: ignore async def do_autos(self, only_comics: Optional[bool] = False) -> None: """ Run Auto Posters Args: only_comics (Optional[bool]): default False Returns: None """ try: meme_grabber = memeg.MemeGrabber() explosm_grabber = explosmg.ExplosmGrabber() xkcd_grabber = xkcdg.XKCDGrabber() smbc_grabber = smbcg.SMBCGrabber() qc_grabber = qcg.QCGrabber() dino_grabber = dinog.DinosaurGrabber() onion_grabber = oniong.OnionGrabber() thn_grabber = thng.THNGrabber() explosm_comics: list[Optional[tuple]] = [] xkcd_comics: list[Optional[tuple]] = [] smbc_comics: list[Optional[tuple]] = [] dino_comics: list[Optional[tuple]] = [] onions: list[Optional[tuple]] = [] thns: list[Optional[tuple]] = [] memes: list[Optional[tuple]] = await meme_grabber.get() try: try: explosm_comics = await explosm_grabber.get() except: pass try: xkcd_comics = await xkcd_grabber.get() except: pass try: smbc_comics = await smbc_grabber.get() except: pass try: qc_comics = await qc_grabber.get() print(f"QC: {qc_comics}") except: pass try: dino_comics = await dino_grabber.get() except Exception as e: logging.debug("Dino failed: %s", str(e)) pass try: onions = await onion_grabber.get() except Exception as e: logging.debug("Onion failed: %s", str(e)) pass try: thns = await thn_grabber.get() except Exception as e: logging.debug("THNs failed: %s", str(e)) pass except: traceback.print_exc() agents: list[str] = constants.HTTP_UA_LIST headers: dict = { 'User-Agent': random.choice(agents) } if not only_comics: try: for meme in memes: if not meme: continue (meme_id, meme_title, meme_url) = meme # pylint: disable=unused-variable request = requests.get(meme_url, stream=True, timeout=(5, 30), headers=headers) if not request.status_code == 200: continue meme_content: bytes = request.raw.read() for meme_hook in self.NO_THREAD_WEBHOOKS.get('memes', {}): meme_image: io.BytesIO = io.BytesIO(meme_content) ext: str = meme_url.split(".")[-1]\ .split("?")[0].split("&")[0] async with ClientSession() as session: webhook: discord.Webhook = discord.Webhook.from_url(meme_hook, session=session) await webhook.send(file=discord.File(meme_image, filename=f'img.{ext}'), username="r/memes") await asyncio.sleep(2) except: pass try: for comic in explosm_comics: if not comic: continue (comic_title, comic_url) = comic comic_title = discord.utils.escape_markdown(comic_title) comic_request = requests.get(comic_url, stream=True, timeout=(5, 20), headers=headers) comic_request.raise_for_status() comic_content: bytes = comic_request.raw.read() ext = comic_url.split(".")[-1]\ .split("?")[0].split("&")[0] async with ClientSession() as session: for chanid, _hook in self.THREADS.get('comic_explosm', {}).items(): comic_image: io.BytesIO = io.BytesIO(comic_content) channel: int = chanid (hook_uri, thread_id) = _hook webhook = discord.Webhook.from_url(hook_uri, session=session) _channel: Any = self.bot.get_channel(channel) if not _channel: return thread = _channel.get_thread(thread_id) await webhook.send(f"**{comic_title}**", file=discord.File(comic_image, filename=f'img.{ext}'), username="Cyanide & Happiness", thread=thread) await asyncio.sleep(2) except: pass try: for comic in xkcd_comics: if not comic: continue (comic_title, comic_url) = comic comic_title = discord.utils.escape_markdown(comic_title) comic_request = requests.get(comic_url, stream=True, timeout=(5, 20), headers=headers) comic_request.raise_for_status() comic_content = comic_request.raw.read() comic_image = io.BytesIO(comic_request.raw.read()) ext = comic_url.split(".")[-1]\ .split("?")[0].split("&")[0] async with ClientSession() as session: for chanid, _hook in self.THREADS.get('comic_xkcd', {}).items(): comic_image = io.BytesIO(comic_content) channel = chanid (hook_uri, thread_id) = _hook webhook = discord.Webhook.from_url(hook_uri, session=session) _channel = self.bot.get_channel(channel) if not _channel: return thread = _channel.get_thread(thread_id) await webhook.send(f"**{comic_title}**", file=discord.File(comic_image, filename=f'img.{ext}'), username="xkcd", thread=thread) await asyncio.sleep(2) except: pass try: for comic in smbc_comics: if not comic: continue (comic_title, comic_url) = comic comic_title = discord.utils.escape_markdown(comic_title) comic_request = requests.get(comic_url, stream=True, timeout=(5, 20), headers=headers) comic_request.raise_for_status() comic_content = comic_request.raw.read() ext = comic_url.split(".")[-1]\ .split("?")[0].split("&")[0] async with ClientSession() as session: for chanid, _hook in self.THREADS.get('comic_smbc', {}).items(): comic_image = io.BytesIO(comic_content) channel = chanid (hook_uri, thread_id) = _hook webhook = discord.Webhook.from_url(hook_uri, session=session) _channel = self.bot.get_channel(channel) if not _channel: return thread = _channel.get_thread(thread_id) await webhook.send(f"**{comic_title}**", file=discord.File(comic_image, filename=f'img.{ext}'), username="SMBC", thread=thread) await asyncio.sleep(2) except: pass try: for comic in qc_comics: logging.debug("Trying QC...") if not comic: continue (comic_title, comic_url) = comic comic_title = discord.utils.escape_markdown(comic_title) comic_url = regex.sub(r'^http://ww\.', 'http://www.', comic_url) comic_url = regex.sub(r'\.pmg$', '.png', comic_url) comic_request = requests.get(comic_url, stream=True, timeout=(5, 20), headers=headers) comic_request.raise_for_status() comic_content = comic_request.raw.read() ext = comic_url.split(".")[-1]\ .split("?")[0].split("&")[0] async with ClientSession() as session: for chanid, _hook in self.THREADS.get('comic_qc', {}).items(): comic_image = io.BytesIO(comic_content) channel = chanid (hook_uri, thread_id) = _hook webhook = discord.Webhook.from_url(hook_uri, session=session) _channel = self.bot.get_channel(channel) if not _channel: return thread = _channel.get_thread(thread_id) await webhook.send(f"**{comic_title}**", file=discord.File(comic_image, filename=f'img.{ext}'), username="Questionable Content", thread=thread) await asyncio.sleep(2) except: traceback.print_exc() pass try: for comic in dino_comics: if not comic: continue (comic_title, comic_url) = comic comic_title = discord.utils.escape_markdown(comic_title) comic_request = requests.get(comic_url, stream=True, timeout=(5, 20), headers=headers) comic_request.raise_for_status() comic_content = comic_request.raw.read() ext = comic_url.split(".")[-1]\ .split("?")[0].split("&")[0] async with ClientSession() as session: for chanid, _hook in self.THREADS.get('comic_dino', {}).items(): comic_image = io.BytesIO(comic_content) channel = chanid (hook_uri, thread_id) = _hook webhook = discord.Webhook.from_url(hook_uri, session=session) _channel = self.bot.get_channel(channel) if not _channel: return thread = _channel.get_thread(thread_id) await webhook.send(f"**{comic_title}**", file=discord.File(comic_image, filename=f'img.{ext}'), username="Dinosaur Comics", thread=thread) await asyncio.sleep(2) except: pass try: for onion in onions: if not onion: continue (onion_title, onion_description, onion_link, onion_video) = onion onion_description = textwrap.wrap(text=onion_description, width=860, max_lines=1)[0] embed: discord.Embed = discord.Embed(title=onion_title) embed.add_field(name="Content", value=f"{onion_description[0:960]}\n-# {onion_link}") async with ClientSession() as session: for hook in self.NO_THREAD_WEBHOOKS.get('theonion', {}): hook_uri = hook webhook = discord.Webhook.from_url(hook_uri, session=session) await webhook.send(embed=embed, username="The Onion") if onion_video: await webhook.send(f"^ video: {onion_video}") await asyncio.sleep(2) except: pass try: for thn in thns: logging.debug("Trying thn...") if not thn: continue (thn_title, thn_description, thn_link, thn_pubdate, thn_video) = thn thn_description = textwrap.wrap(text=thn_description, width=860, max_lines=1)[0] embed = discord.Embed(title=thn_title) embed.add_field(name="Content", value=f"{thn_description[0:960]}\n-# {thn_link}") embed.add_field(name="Published", value=thn_pubdate, inline=False) async with ClientSession() as session: for hook in self.NO_THREAD_WEBHOOKS.get('thn', {}): hook_uri = hook webhook = discord.Webhook.from_url(hook_uri, session=session) await webhook.send(embed=embed, username="The Hacker News") if thn_video: await webhook.send(f"^ video: {thn_video}") await asyncio.sleep(2) except: pass except: # await self.bot.get_channel(self.MEMESTREAM_CHANID).send(f"FUCK, MY MEEMER! YOU DENTED MY MEEMER!") traceback.print_exc() @tasks.loop(hours=12.0) async def meme_stream_loop(self) -> None: """Meme Stream Loop (r/memes)""" try: await asyncio.sleep(10) # Try to ensure we are ready first self.meme_counter += 1 if self.meme_counter == 1: return await self.do_autos(only_comics=True) # Skip first iteration! await self.do_autos() except: traceback.print_exc() @tasks.loop(hours=0.5) async def explosm_loop(self) -> None: """Comic Loop""" try: await asyncio.sleep(10) # Try to ensure we are ready first await self.do_autos(only_comics=True) except: traceback.print_exc() @bridge.bridge_command() # type: ignore @is_spamchan() async def meme(self, ctx) -> None: """Create Meme""" await ctx.respond(view=MemeView()) @bridge.bridge_command(hidden=True) @commands.is_owner() async def domemestream(self, ctx) -> None: """Run Meme Stream Auto Post""" try: await ctx.respond("Trying!", ephemeral=True) await self.do_autos() except: await ctx.respond("Fuck! :(", ephemeral=True) traceback.print_exc() @bridge.bridge_command(hidden=True) @commands.is_owner() async def doexplosm(self, ctx) -> None: """Run Comic Auto Posters""" try: await ctx.respond("Trying!", ephemeral=True) await self.do_autos(only_comics=True) except: await ctx.respond("Fuck! :(", ephemeral=True) traceback.print_exc() def cog_unload(self) -> None: self.meme_stream_loop.cancel() self.explosm_loop.cancel() def setup(bot) -> None: """Run on Cog Load""" bot.add_cog(Meme(bot))