import os import traceback import json import io import asyncio import random from typing import LiteralString, Optional, Any, Union import aiosqlite as sqlite3 import logging import textwrap import regex import requests import discord from disc_havoc import Havoc from aiohttp import ClientSession from discord.ext import bridge, commands, tasks from util.jesusmemes import JesusMemeGenerator import scrapers.reddit_scrape as memeg import scrapers.explosm_scrape as explosmg import scrapers.xkcd_scrape as xkcdg import scrapers.smbc_scrape as smbcg import scrapers.qc_scrape as qcg import scrapers.dinosaur_scrape as dinog import scrapers.onion_scrape as oniong import scrapers.thn_scrape as thng import constants meme_choices = [] BOT_CHANIDS = [] """ TODO: Cleanup new meme leaderboard stuff """ class Helper: """Meme Helper""" def load_meme_choices(self) -> None: """Load Available Meme Templates from JSON File""" global meme_choices memes_file: str | LiteralString = os.path.join( os.path.dirname(__file__), "memes.json" ) with open(memes_file, "r", encoding="utf-8") as f: meme_choices = json.loads(f.read()) class MemeView(discord.ui.View): """Meme Selection discord.ui.View""" helper = Helper() helper.load_meme_choices() @discord.ui.select( placeholder="Choose a Meme!", min_values=1, max_values=1, options=[ discord.SelectOption(label=meme_label) for meme_label in meme_choices[0:24] ], ) async def select_callback( self, select: discord.ui.Select, interaction: discord.Interaction ) -> None: """Meme Selection Callback""" if not isinstance(select.values[0], str): return modal: discord.ui.Modal = MemeModal( meme=select.values[0], title="Meme Selected" ) await interaction.response.send_modal(modal) class MemeModal(discord.ui.Modal): """Meme Creation discord.ui.Modal""" def __init__(self, *args, meme: Optional[str] = None, **kwargs) -> None: super().__init__(*args, **kwargs) self.selected_meme: Optional[str] = meme self.meme_generator = JesusMemeGenerator() self.TEXT_LIMIT: int = 80 self.add_item( discord.ui.InputText( label="Top Text", style=discord.InputTextStyle.singleline ) ) self.add_item( discord.ui.InputText( label="Bottom Text", style=discord.InputTextStyle.singleline ) ) async def callback(self, interaction: discord.Interaction) -> None: if not self.selected_meme: # No meme selected return selected_meme: str = self.selected_meme if not self.children or len(self.children) < 2: # Invalid request return if not isinstance(self.children[0].value, str) or not isinstance( self.children[1].value, str ): # Invalid request return meme_top_line: str = self.children[0].value.strip() meme_bottom_line: str = self.children[1].value.strip() if ( len(meme_top_line) > self.TEXT_LIMIT or len(meme_bottom_line) > self.TEXT_LIMIT ): await interaction.response.send_message( "ERR: Text is limited to 80 characters for each the top and bottom lines." ) return meme_link: Optional[str] = await self.meme_generator.create_meme( top_line=meme_top_line, bottom_line=meme_bottom_line, meme=selected_meme ) if not meme_link: await interaction.response.send_message("Failed!") return embed: discord.Embed = discord.Embed(title="Generated Meme") embed.set_image(url=meme_link) embed.add_field(name="Meme", value=selected_meme, inline=True) await interaction.response.send_message(embeds=[embed]) return class Meme(commands.Cog): """Meme Cog for Havoc""" def __init__(self, bot: Havoc) -> None: self.bot: Havoc = bot self.stats_db_path: LiteralString = os.path.join( "/usr/local/share", "sqlite_dbs", "stats.db" ) self.meme_choices: list = [] self.meme_counter: int = 0 self.THREADS: dict[str, dict[int, list]] = { # Format: Guild1: [ChanId : [Webhook, ThreadId], Guild2: [ChanId : [Webhook, ThreadId] "comic_explosm": { 1298729744216359055: [constants.EXPLOSM_WEBHOOK, 1299165855493390367], 1306414795049926676: [constants.EXPLOSM_WEBHOOK2, 1306416492304138364], }, "comic_xkcd": { 1298729744216359055: [constants.XKCD_WEBHOOK, 1299165928755433483], 1306414795049926676: [constants.XKCD_WEBHOOK2, 1306416681991798854], }, "comic_smbc": { 1298729744216359055: [constants.SMBC_WEBHOOK, 1299166071038808104], 1306414795049926676: [constants.SMBC_WEBHOOK2, 1306416842511745024], }, "comic_qc": { 1298729744216359055: [constants.QC_WEBHOOK, 1299392115364593674], 1306414795049926676: [constants.QC_WEBHOOK2, 1306417084774744114], }, "comic_dino": { 1298729744216359055: [constants.DINO_WEBHOOK, 1299771918886506557], 1306414795049926676: [constants.DINO_WEBHOOK2, 1306417286713704548], }, } self.NO_THREAD_WEBHOOKS: dict[str, list] = { "theonion": [constants.ONION_WEBHOOK, constants.ONION_WEBHOOK2], "thn": [constants.THN_WEBHOOK], "memes": [constants.MEME_WEBHOOK1, constants.MEME_WEBHOOK2], } global BOT_CHANIDS BOT_CHANIDS = self.bot.BOT_CHANIDS # Inherit self.meme_stream_loop.start() self.explosm_loop.start() self.update_meme_lb.start() asyncio.get_event_loop().create_task(self.init_meme_leaderboard()) def is_spamchan() -> bool: # type: ignore """Check if channel is spamchan""" def predicate(ctx): try: if not ctx.channel.id in BOT_CHANIDS: logging.debug("%s not found in %s", ctx.channel.id, BOT_CHANIDS) return ctx.channel.id in BOT_CHANIDS except: traceback.print_exc() return False return commands.check(predicate) # type: ignore async def leaderboard_increment(self, uid: int) -> None: """ Increment leaderboard for uid Args: uid (int): Returns: None """ if not uid in self.meme_leaderboard: self.meme_leaderboard[uid] = 1 else: self.meme_leaderboard[uid] += 1 async with sqlite3.connect(self.stats_db_path, timeout=2) as db_conn: """Attempts both insert/update""" query_1: str = "UPDATE memes SET count = count + 1 WHERE discord_uid = ?" query_1_params: tuple = (uid,) query_2: str = "INSERT INTO memes (discord_uid, count) VALUES (?, ?)" query_2_params: tuple = (uid, self.meme_leaderboard[uid]) try: await db_conn.execute(query_1, query_1_params) except: pass try: await db_conn.execute(query_2, query_2_params) except: pass await db_conn.commit() try: await self.update_meme_lb() except Exception as e: logging.info( "Failed to update meme leaderboard following increment: %s", str(e) ) async def init_meme_leaderboard(self) -> None: """ INIT MEME LEADERBOARD """ self.meme_leaderboard: dict[int, int] = {} async with sqlite3.connect(self.stats_db_path, timeout=2) as db_conn: db_conn.row_factory = sqlite3.Row db_query: str = "SELECT discord_uid, count FROM memes WHERE count > 0" async with db_conn.execute(db_query) as db_cursor: results = await db_cursor.fetchall() for result in results: uid = result["discord_uid"] count = result["count"] self.meme_leaderboard[uid] = count @commands.Cog.listener() async def on_ready(self) -> None: """Run on Bot Ready""" await self.init_meme_leaderboard() async def do_autos(self, only_comics: Optional[bool] = False) -> None: """ Run Auto Posters Args: only_comics (Optional[bool]): default False Returns: None """ try: meme_grabber = memeg.MemeGrabber() explosm_grabber = explosmg.ExplosmGrabber() xkcd_grabber = xkcdg.XKCDGrabber() smbc_grabber = smbcg.SMBCGrabber() qc_grabber = qcg.QCGrabber() dino_grabber = dinog.DinosaurGrabber() onion_grabber = oniong.OnionGrabber() thn_grabber = thng.THNGrabber() explosm_comics: list[Optional[tuple]] = [] xkcd_comics: list[Optional[tuple]] = [] smbc_comics: list[Optional[tuple]] = [] dino_comics: list[Optional[tuple]] = [] onions: list[Optional[tuple]] = [] thns: list[Optional[tuple]] = [] memes: list[Optional[tuple]] = await meme_grabber.get() try: try: explosm_comics = await explosm_grabber.get() except: pass try: xkcd_comics = await xkcd_grabber.get() except: pass try: smbc_comics = await smbc_grabber.get() except: pass try: qc_comics = await qc_grabber.get() print(f"QC: {qc_comics}") except: pass try: dino_comics = await dino_grabber.get() except Exception as e: logging.debug("Dino failed: %s", str(e)) pass try: onions = await onion_grabber.get() except Exception as e: logging.debug("Onion failed: %s", str(e)) pass try: thns = await thn_grabber.get() except Exception as e: logging.debug("THNs failed: %s", str(e)) pass except: traceback.print_exc() agents: list[str] = constants.HTTP_UA_LIST headers: dict = {"User-Agent": random.choice(agents)} if not only_comics: try: for meme in memes: if not meme: continue (meme_id, meme_title, meme_url) = meme request = requests.get( meme_url, stream=True, timeout=(5, 30), headers=headers ) if not request.status_code == 200: continue meme_content: bytes = request.raw.read() for meme_hook in self.NO_THREAD_WEBHOOKS.get("memes", {}): meme_image: io.BytesIO = io.BytesIO(meme_content) ext: str = ( meme_url.split(".")[-1].split("?")[0].split("&")[0] ) async with ClientSession() as session: webhook: discord.Webhook = discord.Webhook.from_url( meme_hook, session=session ) await webhook.send( file=discord.File( meme_image, filename=f"img.{ext}" ), username="r/memes", ) await asyncio.sleep(2) except: pass try: for comic in explosm_comics: if not comic: continue (comic_title, comic_url) = comic comic_title = discord.utils.escape_markdown(comic_title) comic_request = requests.get( comic_url, stream=True, timeout=(5, 20), headers=headers ) comic_request.raise_for_status() comic_content: bytes = comic_request.raw.read() ext = comic_url.split(".")[-1].split("?")[0].split("&")[0] async with ClientSession() as session: for chanid, _hook in self.THREADS.get( "comic_explosm", {} ).items(): comic_image: io.BytesIO = io.BytesIO(comic_content) channel: int = chanid (hook_uri, thread_id) = _hook webhook = discord.Webhook.from_url( hook_uri, session=session ) _channel: Any = self.bot.get_channel(channel) if not _channel: return thread = _channel.get_thread(thread_id) await webhook.send( f"**{comic_title}**", file=discord.File(comic_image, filename=f"img.{ext}"), username="Cyanide & Happiness", thread=thread, ) await asyncio.sleep(2) except: pass try: for comic in xkcd_comics: if not comic: continue (comic_title, comic_url) = comic comic_title = discord.utils.escape_markdown(comic_title) comic_request = requests.get( comic_url, stream=True, timeout=(5, 20), headers=headers ) comic_request.raise_for_status() comic_content = comic_request.raw.read() comic_image = io.BytesIO(comic_request.raw.read()) ext = comic_url.split(".")[-1].split("?")[0].split("&")[0] async with ClientSession() as session: for chanid, _hook in self.THREADS.get("comic_xkcd", {}).items(): comic_image = io.BytesIO(comic_content) channel = chanid (hook_uri, thread_id) = _hook webhook = discord.Webhook.from_url( hook_uri, session=session ) _channel = self.bot.get_channel(channel) if not _channel: return thread = _channel.get_thread(thread_id) await webhook.send( f"**{comic_title}**", file=discord.File(comic_image, filename=f"img.{ext}"), username="xkcd", thread=thread, ) await asyncio.sleep(2) except: pass try: for comic in smbc_comics: if not comic: continue (comic_title, comic_url) = comic comic_title = discord.utils.escape_markdown(comic_title) comic_request = requests.get( comic_url, stream=True, timeout=(5, 20), headers=headers ) comic_request.raise_for_status() comic_content = comic_request.raw.read() ext = comic_url.split(".")[-1].split("?")[0].split("&")[0] async with ClientSession() as session: for chanid, _hook in self.THREADS.get("comic_smbc", {}).items(): comic_image = io.BytesIO(comic_content) channel = chanid (hook_uri, thread_id) = _hook webhook = discord.Webhook.from_url( hook_uri, session=session ) _channel = self.bot.get_channel(channel) if not _channel: return thread = _channel.get_thread(thread_id) await webhook.send( f"**{comic_title}**", file=discord.File(comic_image, filename=f"img.{ext}"), username="SMBC", thread=thread, ) await asyncio.sleep(2) except: pass try: for comic in qc_comics: logging.debug("Trying QC...") if not comic: continue (comic_title, comic_url) = comic comic_title = discord.utils.escape_markdown(comic_title) comic_url = regex.sub(r"^http://ww\.", "http://www.", comic_url) comic_url = regex.sub(r"\.pmg$", ".png", comic_url) comic_request = requests.get( comic_url, stream=True, timeout=(5, 20), headers=headers ) comic_request.raise_for_status() comic_content = comic_request.raw.read() ext = comic_url.split(".")[-1].split("?")[0].split("&")[0] async with ClientSession() as session: for chanid, _hook in self.THREADS.get("comic_qc", {}).items(): comic_image = io.BytesIO(comic_content) channel = chanid (hook_uri, thread_id) = _hook webhook = discord.Webhook.from_url( hook_uri, session=session ) _channel = self.bot.get_channel(channel) if not _channel: return thread = _channel.get_thread(thread_id) await webhook.send( f"**{comic_title}**", file=discord.File(comic_image, filename=f"img.{ext}"), username="Questionable Content", thread=thread, ) await asyncio.sleep(2) except: traceback.print_exc() pass try: for comic in dino_comics: if not comic: continue (comic_title, comic_url) = comic comic_title = discord.utils.escape_markdown(comic_title) comic_request = requests.get( comic_url, stream=True, timeout=(5, 20), headers=headers ) comic_request.raise_for_status() comic_content = comic_request.raw.read() ext = comic_url.split(".")[-1].split("?")[0].split("&")[0] async with ClientSession() as session: for chanid, _hook in self.THREADS.get("comic_dino", {}).items(): comic_image = io.BytesIO(comic_content) channel = chanid (hook_uri, thread_id) = _hook webhook = discord.Webhook.from_url( hook_uri, session=session ) _channel = self.bot.get_channel(channel) if not _channel: return thread = _channel.get_thread(thread_id) await webhook.send( f"**{comic_title}**", file=discord.File(comic_image, filename=f"img.{ext}"), username="Dinosaur Comics", thread=thread, ) await asyncio.sleep(2) except: pass try: for onion in onions: if not onion: continue (onion_title, onion_description, onion_link, onion_video) = onion onion_description = textwrap.wrap( text=onion_description, width=860, max_lines=1 )[0] embed: discord.Embed = discord.Embed(title=onion_title) embed.add_field( name="Content", value=f"{onion_description[0:960]}\n-# {onion_link}", ) async with ClientSession() as session: for hook in self.NO_THREAD_WEBHOOKS.get("theonion", {}): hook_uri = hook webhook = discord.Webhook.from_url( hook_uri, session=session ) await webhook.send(embed=embed, username="The Onion") if onion_video: await webhook.send(f"^ video: {onion_video}") await asyncio.sleep(2) except: pass try: for thn in thns: logging.debug("Trying thn...") if not thn: continue (thn_title, thn_description, thn_link, thn_pubdate, thn_video) = thn thn_description = textwrap.wrap( text=thn_description, width=860, max_lines=1 )[0] embed = discord.Embed(title=thn_title) embed.add_field( name="Content", value=f"{thn_description[0:960]}\n-# {thn_link}" ) embed.add_field(name="Published", value=thn_pubdate, inline=False) async with ClientSession() as session: for hook in self.NO_THREAD_WEBHOOKS.get("thn", {}): hook_uri = hook webhook = discord.Webhook.from_url( hook_uri, session=session ) await webhook.send(embed=embed, username="The Hacker News") if thn_video: await webhook.send(f"^ video: {thn_video}") await asyncio.sleep(2) except: pass except: # await self.bot.get_channel(self.MEMESTREAM_CHANID).send(f"FUCK, MY MEEMER! YOU DENTED MY MEEMER!") traceback.print_exc() @tasks.loop(hours=12.0) async def meme_stream_loop(self) -> None: """Meme Stream Loop (r/memes)""" try: await asyncio.sleep(10) # Try to ensure we are ready first self.meme_counter += 1 if self.meme_counter == 1: return await self.do_autos(only_comics=True) # Skip first iteration! await self.do_autos() except: traceback.print_exc() @tasks.loop(hours=0.5) async def explosm_loop(self) -> None: """Comic Loop""" try: await asyncio.sleep(10) # Try to ensure we are ready first await self.do_autos(only_comics=True) except: traceback.print_exc() @bridge.bridge_command() # type: ignore @is_spamchan() async def meme(self, ctx) -> None: """Create Meme""" await ctx.respond(view=MemeView()) @bridge.bridge_command(hidden=True) @commands.is_owner() async def domemestream(self, ctx) -> None: """Run Meme Stream Auto Post""" try: await ctx.respond("Trying!", ephemeral=True) await self.do_autos() except: await ctx.respond("Fuck! :(", ephemeral=True) traceback.print_exc() @bridge.bridge_command(hidden=True) @commands.is_owner() async def doexplosm(self, ctx) -> None: """Run Comic Auto Posters""" try: await ctx.respond("Trying!", ephemeral=True) await self.do_autos(only_comics=True) except: await ctx.respond("Fuck! :(", ephemeral=True) traceback.print_exc() @commands.Cog.listener() async def on_message(self, message: discord.Message) -> None: """ Message hook, to monitor for memes Also monitors for messages to #memes-top-10 to autodelete, only Havoc may post in #memes-top-10! """ lb_chanid: int = 1352373745108652145 if not self.bot.user: # No valid client instance return if not isinstance(message.channel, discord.TextChannel): return if ( message.channel.id == lb_chanid and not message.author.id == self.bot.user.id ): """Message to #memes-top-10 not by Havoc, delete it""" await message.delete( reason=f"Messages to #{message.channel.name} are not allowed" ) removal_embed: discord.Embed = discord.Embed( title="Message Deleted", description=f"Your message to **#{message.channel.name}** has been automatically deleted.\n**Reason**: Messages to this channel by users is not allowed.", ) await message.author.send(embed=removal_embed) if message.author.id == self.bot.user.id: # Bots own message return if not message.guild: return if not message.channel.id == 1147229098544988261: # Not meme channel return if not message.attachments: # No attachments to consider a meme return await self.leaderboard_increment(message.author.id) async def get_top(self, n: int = 10) -> Optional[list[tuple]]: """ Get top (n=10) Memes Args: n (int): Number of top results to return, default 10 Returns: Optional[dict] """ try: out_top: list[tuple[int, int]] = [] async with sqlite3.connect(self.stats_db_path, timeout=2) as db_conn: db_conn.row_factory = sqlite3.Row query: str = ( "SELECT discord_uid, count FROM memes WHERE count > 0 ORDER BY count DESC" ) async with db_conn.execute(query) as db_cursor: db_result = await db_cursor.fetchall() for res in db_result: uid = res["discord_uid"] count = res["count"] out_top.append((uid, count)) # Check for and remove missing members guild_id: int = 1145182936002482196 guild: Optional[discord.Guild] = self.bot.get_guild(guild_id) if not guild: return None for x, entry in enumerate(out_top): (uid, _) = entry member: Optional[discord.Member] = guild.get_member(uid) if not member: out_top.pop(x) return out_top[0 : (n + 1)] except: traceback.print_exc() return None async def get_top_embed(self, n: int = 10) -> Optional[discord.Embed]: """ Get Top Memes Embed Args: n (int): Number of top results to return, default 10 Returns: Optional[discord.Embed] """ guild_id: int = 1145182936002482196 guild: Optional[discord.Guild] = self.bot.get_guild(guild_id) if not guild: return None top: Optional[list[tuple]] = await self.get_top(n) if not top: return None top_formatted: str = "" for x, item in enumerate(top): (uid, count) = item member: Optional[discord.Member] = guild.get_member(uid) if not member: continue display_name: str = member.display_name top_formatted += ( f"{x+1}. **{discord.utils.escape_markdown(display_name)}**: *{count}*\n" ) top_formatted = top_formatted.strip() embed: discord.Embed = discord.Embed( title=f"Top {n} Memes", description=top_formatted, colour=0x25BD6B ) return embed @tasks.loop(seconds=30, reconnect=True) async def update_meme_lb(self) -> None: """Update the Meme Leaderboard""" try: lb_chanid: int = 1352373745108652145 message_id: int = 1352440888231723070 top_embed = await self.get_top_embed(n=10) channel = self.bot.get_channel(lb_chanid) if not isinstance(channel, discord.TextChannel): return message_to_edit = await channel.fetch_message(message_id) await message_to_edit.edit( embed=top_embed, content="## This message will automatically update periodically.", ) except: traceback.print_exc() @bridge.bridge_command(hidden=True) @commands.is_owner() async def doembed(self, ctx) -> None: """Do Meme Embed""" meme_lb_chan_id: int = 1352373745108652145 meme_lb_chan: Union[discord.TextChannel, Any] = self.bot.get_channel( meme_lb_chan_id ) embed = await self.get_top_embed() if embed: await meme_lb_chan.send(embed=embed) else: await ctx.respond("NO embed :(") def cog_unload(self) -> None: self.meme_stream_loop.cancel() self.explosm_loop.cancel() self.update_meme_lb.cancel() def setup(bot) -> None: """Run on Cog Load""" bot.add_cog(Meme(bot))