import os import traceback import json import io import asyncio import random import copy from PIL import Image, UnidentifiedImageError import imagehash from typing import LiteralString, Optional, Any, Union import aiosqlite as sqlite3 import logging import textwrap import regex import requests import discord from disc_havoc import Havoc from aiohttp import ClientSession from discord.ext import bridge, commands, tasks from util.jesusmemes import JesusMemeGenerator import scrapers.reddit_scrape as memeg import scrapers.explosm_scrape as explosmg import scrapers.xkcd_scrape as xkcdg import scrapers.smbc_scrape as smbcg import scrapers.qc_scrape as qcg import scrapers.dinosaur_scrape as dinog import scrapers.onion_scrape as oniong import scrapers.thn_scrape as thng import constants meme_choices = [] BOT_CHANIDS = [] """ TODO: Cleanup new meme leaderboard stuff """ class Helper: """Meme Helper""" def load_meme_choices(self) -> None: """Load Available Meme Templates from JSON File""" global meme_choices memes_file: str | LiteralString = os.path.join( os.path.dirname(__file__), "memes.json" ) with open(memes_file, "r", encoding="utf-8") as f: meme_choices = json.loads(f.read()) class MemeView(discord.ui.View): """Meme Selection discord.ui.View""" helper = Helper() helper.load_meme_choices() @discord.ui.select( placeholder="Choose a Meme!", min_values=1, max_values=1, options=[ discord.SelectOption(label=meme_label) for meme_label in meme_choices[0:24] ], ) async def select_callback( self, select: discord.ui.Select, interaction: discord.Interaction ) -> None: """Meme Selection Callback""" if not isinstance(select.values[0], str): return modal: discord.ui.Modal = MemeModal( meme=select.values[0], title="Meme Selected" ) await interaction.response.send_modal(modal) class MemeModal(discord.ui.Modal): """Meme Creation discord.ui.Modal""" def __init__(self, *args, meme: Optional[str] = None, **kwargs) -> None: super().__init__(*args, **kwargs) self.selected_meme: Optional[str] = meme self.meme_generator = JesusMemeGenerator() self.TEXT_LIMIT: int = 80 self.add_item( discord.ui.InputText( label="Top Text", style=discord.InputTextStyle.singleline ) ) self.add_item( discord.ui.InputText( label="Bottom Text", style=discord.InputTextStyle.singleline ) ) async def callback(self, interaction: discord.Interaction) -> None: if not self.selected_meme: # No meme selected return selected_meme: str = self.selected_meme if not self.children or len(self.children) < 2: # Invalid request return if not isinstance(self.children[0].value, str) or not isinstance( self.children[1].value, str ): # Invalid request return meme_top_line: str = self.children[0].value.strip() meme_bottom_line: str = self.children[1].value.strip() if ( len(meme_top_line) > self.TEXT_LIMIT or len(meme_bottom_line) > self.TEXT_LIMIT ): await interaction.response.send_message( "ERR: Text is limited to 80 characters for each the top and bottom lines." ) return meme_link: Optional[str] = await self.meme_generator.create_meme( top_line=meme_top_line, bottom_line=meme_bottom_line, meme=selected_meme ) if not meme_link: await interaction.response.send_message("Failed!") return embed: discord.Embed = discord.Embed(title="Generated Meme") embed.set_image(url=meme_link) embed.add_field(name="Meme", value=selected_meme, inline=True) await interaction.response.send_message(embeds=[embed]) return class Meme(commands.Cog): """Meme Cog for Havoc""" def __init__(self, bot: Havoc) -> None: self.bot: Havoc = bot self.stats_db_path: str = os.path.join( "/usr/local/share", "sqlite_dbs", "stats.db" ) self.memedb_path: str = os.path.join( "/usr/local/share", "sqlite_dbs", "meme.db" ) self.meme_choices: list = [] self.meme_counter: int = 0 self.THREADS: dict[str, dict[int, list]] = { # Format: Guild1: [ChanId : [Webhook, ThreadId], Guild2: [ChanId : [Webhook, ThreadId] "comic_explosm": { 1298729744216359055: [constants.EXPLOSM_WEBHOOK, 1299165855493390367], 1306414795049926676: [constants.EXPLOSM_WEBHOOK2, 1306416492304138364], }, "comic_xkcd": { 1298729744216359055: [constants.XKCD_WEBHOOK, 1299165928755433483], 1306414795049926676: [constants.XKCD_WEBHOOK2, 1306416681991798854], }, "comic_smbc": { 1298729744216359055: [constants.SMBC_WEBHOOK, 1299166071038808104], 1306414795049926676: [constants.SMBC_WEBHOOK2, 1306416842511745024], }, "comic_qc": { 1298729744216359055: [constants.QC_WEBHOOK, 1299392115364593674], 1306414795049926676: [constants.QC_WEBHOOK2, 1306417084774744114], }, "comic_dino": { 1298729744216359055: [constants.DINO_WEBHOOK, 1299771918886506557], 1306414795049926676: [constants.DINO_WEBHOOK2, 1306417286713704548], }, } self.NO_THREAD_WEBHOOKS: dict[str, list] = { "theonion": [constants.ONION_WEBHOOK, constants.ONION_WEBHOOK2], "thn": [constants.THN_WEBHOOK], "memes": [constants.MEME_WEBHOOK1, constants.MEME_WEBHOOK2], } global BOT_CHANIDS BOT_CHANIDS = self.bot.BOT_CHANIDS # Inherit self.meme_stream_loop.start() self.explosm_loop.start() self.update_meme_lb.start() asyncio.get_event_loop().create_task(self.init_meme_leaderboard()) def is_spamchan() -> bool: # type: ignore """Check if channel is spamchan""" def predicate(ctx): try: if ctx.channel.id not in BOT_CHANIDS: logging.debug("%s not found in %s", ctx.channel.id, BOT_CHANIDS) return ctx.channel.id in BOT_CHANIDS except Exception as e: logging.debug("Exception: %s", str(e)) traceback.print_exc() return False return commands.check(predicate) # type: ignore async def leaderboard_increment(self, uid: int) -> None: """ Increment leaderboard for uid Args: uid (int): Returns: None """ if uid not in self.meme_leaderboard: self.meme_leaderboard[uid] = 1 else: self.meme_leaderboard[uid] += 1 async with sqlite3.connect(self.stats_db_path, timeout=2) as db_conn: """Attempts both insert/update""" query_1: str = "UPDATE memes SET count = count + 1 WHERE discord_uid = ?" query_1_params: tuple = (uid,) query_2: str = "INSERT INTO memes (discord_uid, count) VALUES (?, ?)" query_2_params: tuple = (uid, self.meme_leaderboard[uid]) try: await db_conn.execute(query_1, query_1_params) except: # noqa """Safe to ignore""" pass try: await db_conn.execute(query_2, query_2_params) except: # noqa """Safe to ignore""" pass await db_conn.commit() try: await self.update_meme_lb() except Exception as e: logging.info( "Failed to update meme leaderboard following increment: %s", str(e) ) async def init_meme_leaderboard(self) -> None: """ INIT MEME LEADERBOARD """ self.meme_leaderboard: dict[int, int] = {} async with sqlite3.connect(self.stats_db_path, timeout=2) as db_conn: db_conn.row_factory = sqlite3.Row db_query: str = "SELECT discord_uid, count FROM memes WHERE count > 0" async with db_conn.execute(db_query) as db_cursor: results = await db_cursor.fetchall() for result in results: uid = result["discord_uid"] count = result["count"] self.meme_leaderboard[uid] = count async def insert_meme( self, discord_uid: int, timestamp: int, message_id: int, image_url: str ) -> Optional[bool]: """ INSERT MEME -> SQLITE DB """ try: _image: io.BytesIO = io.BytesIO( requests.get(image_url, stream=True, timeout=20).raw.read() ) image_copy = copy.deepcopy(_image) image = Image.open(image_copy) except UnidentifiedImageError: return None phash: str = str(imagehash.phash(image)) query: str = "INSERT INTO memes(discord_uid, timestamp, image, message_ids, phash) VALUES(?, ?, ?, ?, ?)" async with sqlite3.connect(self.memedb_path, timeout=2) as db_conn: insert = await db_conn.execute_insert( query, (discord_uid, timestamp, _image.read(), message_id, phash) ) if insert: await db_conn.commit() return True return None async def dupe_check(self, image) -> bool | int: """ CHECK DB FOR DUPLICATE MEMES! """ phash: str = str(imagehash.phash(image)) query: str = "SELECT message_ids FROM memes WHERE phash = ? LIMIT 1" async with sqlite3.connect(self.memedb_path, timeout=2) as db_conn: db_conn.row_factory = sqlite3.Row async with await db_conn.execute(query, (phash,)) as db_cursor: result = await db_cursor.fetchone() if result: return result["message_ids"] return False @commands.Cog.listener() async def on_ready(self) -> None: """Run on Bot Ready""" await self.init_meme_leaderboard() async def do_autos(self, only_comics: Optional[bool] = False) -> None: """ Run Auto Posters Args: only_comics (Optional[bool]): default False Returns: None """ try: meme_grabber = memeg.MemeGrabber() explosm_grabber = explosmg.ExplosmGrabber() xkcd_grabber = xkcdg.XKCDGrabber() smbc_grabber = smbcg.SMBCGrabber() qc_grabber = qcg.QCGrabber() dino_grabber = dinog.DinosaurGrabber() onion_grabber = oniong.OnionGrabber() thn_grabber = thng.THNGrabber() explosm_comics: list[Optional[tuple]] = [] xkcd_comics: list[Optional[tuple]] = [] smbc_comics: list[Optional[tuple]] = [] dino_comics: list[Optional[tuple]] = [] onions: list[Optional[tuple]] = [] thns: list[Optional[tuple]] = [] memes: list[Optional[tuple]] = await meme_grabber.get() try: try: explosm_comics = await explosm_grabber.get() except: # noqa """Safe to ignore""" pass try: xkcd_comics = await xkcd_grabber.get() except: # noqa """Safe to ignore""" pass try: smbc_comics = await smbc_grabber.get() except: # noqa """Safe to ignore""" pass try: qc_comics = await qc_grabber.get() print(f"QC: {qc_comics}") except: # noqa """Safe to ignore""" pass try: dino_comics = await dino_grabber.get() except Exception as e: logging.debug("Dino failed: %s", str(e)) try: onions = await onion_grabber.get() except Exception as e: logging.debug("Onion failed: %s", str(e)) pass try: thns = await thn_grabber.get() except Exception as e: logging.debug("THNs failed: %s", str(e)) pass except Exception as e: logging.debug("Exception: %s", str(e)) traceback.print_exc() agents: list[str] = constants.HTTP_UA_LIST headers: dict = {"User-Agent": random.choice(agents)} if not only_comics: try: for meme in memes: if not meme: continue (meme_id, meme_title, meme_url) = meme request = requests.get( meme_url, stream=True, timeout=(5, 30), headers=headers ) if not request.status_code == 200: continue meme_content: bytes = request.raw.read() for meme_hook in self.NO_THREAD_WEBHOOKS.get("memes", {}): meme_image: io.BytesIO = io.BytesIO(meme_content) ext: str = ( meme_url.split(".")[-1].split("?")[0].split("&")[0] ) async with ClientSession() as session: webhook: discord.Webhook = discord.Webhook.from_url( meme_hook, session=session ) await webhook.send( file=discord.File( meme_image, filename=f"img.{ext}" ), username="r/memes", ) await asyncio.sleep(2) except: # noqa """Safe to ignore""" pass try: for comic in explosm_comics: if not comic: continue (comic_title, comic_url) = comic comic_title = discord.utils.escape_markdown(comic_title) comic_request = requests.get( comic_url, stream=True, timeout=(5, 20), headers=headers ) comic_request.raise_for_status() comic_content: bytes = comic_request.raw.read() ext = comic_url.split(".")[-1].split("?")[0].split("&")[0] async with ClientSession() as session: for chanid, _hook in self.THREADS.get( "comic_explosm", {} ).items(): comic_image: io.BytesIO = io.BytesIO(comic_content) channel: int = chanid (hook_uri, thread_id) = _hook webhook = discord.Webhook.from_url( hook_uri, session=session ) _channel: Any = self.bot.get_channel(channel) if not _channel: return thread = _channel.get_thread(thread_id) await webhook.send( f"**{comic_title}**", file=discord.File(comic_image, filename=f"img.{ext}"), username="Cyanide & Happiness", thread=thread, ) await asyncio.sleep(2) except: # noqa """Safe to ignore""" pass try: for comic in xkcd_comics: if not comic: continue (comic_title, comic_url) = comic comic_title = discord.utils.escape_markdown(comic_title) comic_request = requests.get( comic_url, stream=True, timeout=(5, 20), headers=headers ) comic_request.raise_for_status() comic_content = comic_request.raw.read() comic_image = io.BytesIO(comic_request.raw.read()) ext = comic_url.split(".")[-1].split("?")[0].split("&")[0] async with ClientSession() as session: for chanid, _hook in self.THREADS.get("comic_xkcd", {}).items(): comic_image = io.BytesIO(comic_content) channel = chanid (hook_uri, thread_id) = _hook webhook = discord.Webhook.from_url( hook_uri, session=session ) _channel = self.bot.get_channel(channel) if not _channel: return thread = _channel.get_thread(thread_id) await webhook.send( f"**{comic_title}**", file=discord.File(comic_image, filename=f"img.{ext}"), username="xkcd", thread=thread, ) await asyncio.sleep(2) except: # noqa """Safe to ignore""" pass try: for comic in smbc_comics: if not comic: continue (comic_title, comic_url) = comic comic_title = discord.utils.escape_markdown(comic_title) comic_request = requests.get( comic_url, stream=True, timeout=(5, 20), headers=headers ) comic_request.raise_for_status() comic_content = comic_request.raw.read() ext = comic_url.split(".")[-1].split("?")[0].split("&")[0] async with ClientSession() as session: for chanid, _hook in self.THREADS.get("comic_smbc", {}).items(): comic_image = io.BytesIO(comic_content) channel = chanid (hook_uri, thread_id) = _hook webhook = discord.Webhook.from_url( hook_uri, session=session ) _channel = self.bot.get_channel(channel) if not _channel: return thread = _channel.get_thread(thread_id) await webhook.send( f"**{comic_title}**", file=discord.File(comic_image, filename=f"img.{ext}"), username="SMBC", thread=thread, ) await asyncio.sleep(2) except: # noqa """Safe to ignore""" pass try: for comic in qc_comics: logging.debug("Trying QC...") if not comic: continue (comic_title, comic_url) = comic comic_title = discord.utils.escape_markdown(comic_title) comic_url = regex.sub(r"^http://ww\.", "http://www.", comic_url) comic_url = regex.sub(r"\.pmg$", ".png", comic_url) comic_request = requests.get( comic_url, stream=True, timeout=(5, 20), headers=headers ) comic_request.raise_for_status() comic_content = comic_request.raw.read() ext = comic_url.split(".")[-1].split("?")[0].split("&")[0] async with ClientSession() as session: for chanid, _hook in self.THREADS.get("comic_qc", {}).items(): comic_image = io.BytesIO(comic_content) channel = chanid (hook_uri, thread_id) = _hook webhook = discord.Webhook.from_url( hook_uri, session=session ) _channel = self.bot.get_channel(channel) if not _channel: return thread = _channel.get_thread(thread_id) await webhook.send( f"**{comic_title}**", file=discord.File(comic_image, filename=f"img.{ext}"), username="Questionable Content", thread=thread, ) await asyncio.sleep(2) except Exception as e: logging.debug("Exception: %s", str(e)) traceback.print_exc() try: for comic in dino_comics: if not comic: continue (comic_title, comic_url) = comic comic_title = discord.utils.escape_markdown(comic_title) comic_request = requests.get( comic_url, stream=True, timeout=(5, 20), headers=headers ) comic_request.raise_for_status() comic_content = comic_request.raw.read() ext = comic_url.split(".")[-1].split("?")[0].split("&")[0] async with ClientSession() as session: for chanid, _hook in self.THREADS.get("comic_dino", {}).items(): comic_image = io.BytesIO(comic_content) channel = chanid (hook_uri, thread_id) = _hook webhook = discord.Webhook.from_url( hook_uri, session=session ) _channel = self.bot.get_channel(channel) if not _channel: return thread = _channel.get_thread(thread_id) await webhook.send( f"**{comic_title}**", file=discord.File(comic_image, filename=f"img.{ext}"), username="Dinosaur Comics", thread=thread, ) await asyncio.sleep(2) except: # noqa """Safe to ignore""" pass try: for onion in onions: if not onion: continue (onion_title, onion_description, onion_link, onion_video) = onion onion_description = textwrap.wrap( text=onion_description, width=860, max_lines=1 )[0] embed: discord.Embed = discord.Embed(title=onion_title) embed.add_field( name="Content", value=f"{onion_description[0:960]}\n-# {onion_link}", ) async with ClientSession() as session: for hook in self.NO_THREAD_WEBHOOKS.get("theonion", {}): hook_uri = hook webhook = discord.Webhook.from_url( hook_uri, session=session ) await webhook.send(embed=embed, username="The Onion") if onion_video: await webhook.send(f"^ video: {onion_video}") await asyncio.sleep(2) except: # noqa """Safe to ignore""" pass try: for thn in thns: logging.debug("Trying thn...") if not thn: continue (thn_title, thn_description, thn_link, thn_pubdate, thn_video) = thn thn_description = textwrap.wrap( text=thn_description, width=860, max_lines=1 )[0] embed = discord.Embed(title=thn_title) embed.add_field( name="Content", value=f"{thn_description[0:960]}\n-# {thn_link}" ) embed.add_field(name="Published", value=thn_pubdate, inline=False) async with ClientSession() as session: for hook in self.NO_THREAD_WEBHOOKS.get("thn", {}): hook_uri = hook webhook = discord.Webhook.from_url( hook_uri, session=session ) await webhook.send(embed=embed, username="The Hacker News") if thn_video: await webhook.send(f"^ video: {thn_video}") await asyncio.sleep(2) except: # noqa """Safe to ignore""" pass except Exception as e: logging.debug("Exception: %s", str(e)) traceback.print_exc() @tasks.loop(hours=12.0) async def meme_stream_loop(self) -> None: """Meme Stream Loop (r/memes)""" try: await asyncio.sleep(10) # Try to ensure we are ready first self.meme_counter += 1 if self.meme_counter == 1: return await self.do_autos(only_comics=True) # Skip first iteration! await self.do_autos() except Exception as e: logging.debug("Exception: %s", str(e)) traceback.print_exc() @tasks.loop(hours=0.5) async def explosm_loop(self) -> None: """Comic Loop""" try: await asyncio.sleep(10) # Try to ensure we are ready first await self.do_autos(only_comics=True) except Exception as e: logging.debug("Exception: %s", str(e)) traceback.print_exc() @bridge.bridge_command() # type: ignore @is_spamchan() async def meme(self, ctx) -> None: """Create Meme""" await ctx.respond(view=MemeView()) @bridge.bridge_command(hidden=True) @commands.is_owner() async def domemestream(self, ctx) -> None: """Run Meme Stream Auto Post""" try: await ctx.respond("Trying!", ephemeral=True) await self.do_autos() except Exception as e: logging.debug("Exception: %s", str(e)) traceback.print_exc() await ctx.respond("Fuck! :(", ephemeral=True) @bridge.bridge_command(hidden=True) @commands.is_owner() async def doexplosm(self, ctx) -> None: """Run Comic Auto Posters""" try: await ctx.respond("Trying!", ephemeral=True) await self.do_autos(only_comics=True) except Exception as e: logging.debug("Exception: %s", str(e)) traceback.print_exc() await ctx.respond("Fuck! :(", ephemeral=True) @commands.Cog.listener() async def on_message(self, message: discord.Message) -> None: """ Message hook, to monitor for memes Also monitors for messages to #memes-top-10 to autodelete, only Havoc may post in #memes-top-10! """ lb_chanid: int = 1352373745108652145 meme_chanid: int = 1147229098544988261 if not self.bot.user: # No valid client instance return if not isinstance(message.channel, discord.TextChannel): return if ( message.channel.id == lb_chanid and not message.author.id == self.bot.user.id ): """Message to #memes-top-10 not by Havoc, delete it""" await message.delete( reason=f"Messages to #{message.channel.name} are not allowed" ) removal_embed: discord.Embed = discord.Embed( title="Message Deleted", description=f"Your message to **#{message.channel.name}** has been automatically deleted.\n**Reason**: Messages to this channel by users is not allowed.", ) await message.author.send(embed=removal_embed) if message.author.id == self.bot.user.id: # Bots own message return if not message.guild: return if message.channel.id not in [ 1157529874936909934, meme_chanid, ]: # Not meme channel return if not message.attachments: # No attachments to consider a meme return unique_memes: list = [] for item in message.attachments: if item.url and len(item.url) >= 20: image: io.BytesIO = io.BytesIO( requests.get(item.url, stream=True, timeout=20).raw.read() ) dupe_check = await self.dupe_check(Image.open(image)) if dupe_check: channel = message.channel original_message = await channel.fetch_message(dupe_check) # type: ignore original_message_url = original_message.jump_url await message.add_reaction( emoji="<:quietscheentchen:1255956612804247635>" ) await message.reply(original_message_url) else: unique_memes.append(item.url) if unique_memes: await self.leaderboard_increment(message.author.id) for meme_url in unique_memes: author_id: int = message.author.id timestamp: int = int(message.created_at.timestamp()) await self.insert_meme(author_id, timestamp, message.id, meme_url) async def get_top(self, n: int = 10) -> Optional[list[tuple]]: """ Get top (n=10) Memes Args: n (int): Number of top results to return, default 10 Returns: Optional[dict] """ try: out_top: list[tuple[int, int]] = [] async with sqlite3.connect(self.stats_db_path, timeout=2) as db_conn: db_conn.row_factory = sqlite3.Row query: str = "SELECT discord_uid, count FROM memes WHERE count > 0 ORDER BY count DESC" async with db_conn.execute(query) as db_cursor: db_result = await db_cursor.fetchall() for res in db_result: uid = res["discord_uid"] count = res["count"] out_top.append((uid, count)) # Check for and remove missing members guild_id: int = 1145182936002482196 guild: Optional[discord.Guild] = self.bot.get_guild(guild_id) if not guild: return None for x, entry in enumerate(out_top): (uid, _) = entry member: Optional[discord.Member] = guild.get_member(uid) if not member: out_top.pop(x) return out_top[0 : (n + 1)] except Exception as e: logging.debug("Exception: %s", str(e)) traceback.print_exc() return None async def get_top_embed(self, n: int = 10) -> Optional[discord.Embed]: """ Get Top Memes Embed Args: n (int): Number of top results to return, default 10 Returns: Optional[discord.Embed] """ guild_id: int = 1145182936002482196 guild: Optional[discord.Guild] = self.bot.get_guild(guild_id) if not guild: return None top: Optional[list[tuple]] = await self.get_top(n) if not top: return None top_formatted: str = "" for x, item in enumerate(top): (uid, count) = item member: Optional[discord.Member] = guild.get_member(uid) if not member: continue display_name: str = member.display_name top_formatted += f"{x + 1}. **{discord.utils.escape_markdown(display_name)}**: *{count}*\n" top_formatted = top_formatted.strip() embed: discord.Embed = discord.Embed( title=f"Top {n} Memes", description=top_formatted, colour=0x25BD6B ) return embed @tasks.loop(seconds=30, reconnect=True) async def update_meme_lb(self) -> None: """Update the Meme Leaderboard""" try: lb_chanid: int = 1352373745108652145 message_id: int = 1352440888231723070 top_embed = await self.get_top_embed(n=10) channel = self.bot.get_channel(lb_chanid) if not isinstance(channel, discord.TextChannel): return message_to_edit = await channel.fetch_message(message_id) await message_to_edit.edit( embed=top_embed, content="## This message will automatically update periodically.", ) except Exception as e: logging.debug("Exception: %s", str(e)) traceback.print_exc() @bridge.bridge_command(hidden=True) @commands.is_owner() async def doembed(self, ctx) -> None: """Do Meme Embed""" meme_lb_chan_id: int = 1352373745108652145 meme_lb_chan: Union[discord.TextChannel, Any] = self.bot.get_channel( meme_lb_chan_id ) embed = await self.get_top_embed() if embed: await meme_lb_chan.send(embed=embed) else: await ctx.respond("NO embed :(") def cog_unload(self) -> None: self.meme_stream_loop.cancel() self.explosm_loop.cancel() self.update_meme_lb.cancel() def setup(bot) -> None: """Run on Cog Load""" bot.add_cog(Meme(bot))