2025-03-20 20:56:26 -04:00

676 lines
30 KiB
Python

import os
import traceback
import json
import io
import asyncio
import random
from typing import (LiteralString,
Optional,
Any,
Union)
import aiosqlite as sqlite3
import logging
import textwrap
import regex
import requests
import discord
from disc_havoc import Havoc
from aiohttp import ClientSession
from discord.ext import bridge, commands, tasks
from util.jesusmemes import JesusMemeGenerator
import scrapers.reddit_scrape as memeg
import scrapers.explosm_scrape as explosmg
import scrapers.xkcd_scrape as xkcdg
import scrapers.smbc_scrape as smbcg
import scrapers.qc_scrape as qcg
import scrapers.dinosaur_scrape as dinog
import scrapers.onion_scrape as oniong
import scrapers.thn_scrape as thng
import constants
meme_choices = []
BOT_CHANIDS = []
"""
TODO: Cleanup new meme leaderboard stuff
"""
class Helper:
"""Meme Helper"""
def load_meme_choices(self) -> None:
"""Load Available Meme Templates from JSON File"""
global meme_choices
memes_file: str|LiteralString = os.path.join(os.path.dirname(__file__), "memes.json")
with open(memes_file, 'r', encoding='utf-8') as f:
meme_choices = json.loads(f.read())
class MemeView(discord.ui.View):
"""Meme Selection discord.ui.View"""
helper = Helper()
helper.load_meme_choices()
@discord.ui.select(
placeholder = "Choose a Meme!",
min_values = 1,
max_values = 1,
options = [
discord.SelectOption(
label=meme_label
) for meme_label in meme_choices[0:24]
]
)
async def select_callback(self, select: discord.ui.Select,
interaction: discord.Interaction) -> None:
"""Meme Selection Callback"""
if not isinstance(select.values[0], str):
return
modal: discord.ui.Modal = MemeModal(meme=select.values[0], title="Meme Selected")
await interaction.response.send_modal(modal)
class MemeModal(discord.ui.Modal):
"""Meme Creation discord.ui.Modal"""
def __init__(self, *args, meme: Optional[str] = None, **kwargs) -> None:
super().__init__(*args, **kwargs)
self.selected_meme: Optional[str] = meme
self.meme_generator = JesusMemeGenerator()
self.TEXT_LIMIT: int = 80
self.add_item(discord.ui.InputText(label="Top Text",
style=discord.InputTextStyle.singleline))
self.add_item(discord.ui.InputText(label="Bottom Text",
style=discord.InputTextStyle.singleline))
async def callback(self, interaction: discord.Interaction) -> None:
if not self.selected_meme: # No meme selected
return
selected_meme: str = self.selected_meme
if not self.children or len(self.children) < 2: # Invalid request
return
if not isinstance(self.children[0].value, str)\
or not isinstance(self.children[1].value, str): # Invalid request
return
meme_top_line: str = self.children[0].value.strip()
meme_bottom_line: str = self.children[1].value.strip()
if len(meme_top_line) > self.TEXT_LIMIT or len(meme_bottom_line) > self.TEXT_LIMIT:
await interaction.response.send_message("ERR: Text is limited to 80 characters for each the top and bottom lines.")
return
meme_link: Optional[str] = await self.meme_generator.create_meme(top_line=meme_top_line,
bottom_line=meme_bottom_line, meme=selected_meme)
if not meme_link:
await interaction.response.send_message("Failed!")
return
embed: discord.Embed = discord.Embed(title="Generated Meme")
embed.set_image(url=meme_link)
embed.add_field(name="Meme", value=selected_meme, inline=True)
await interaction.response.send_message(embeds=[embed])
return
class Meme(commands.Cog):
"""Meme Cog for Havoc"""
def __init__(self, bot: Havoc) -> None:
self.bot: Havoc = bot
self.stats_db_path: LiteralString = os.path.join("/usr/local/share",
"sqlite_dbs", "stats.db")
self.meme_choices: list = []
self.meme_counter: int = 0
self.THREADS: dict[str, dict[int, list]] = {
# Format: Guild1: [ChanId : [Webhook, ThreadId], Guild2: [ChanId : [Webhook, ThreadId]
'comic_explosm': {
1298729744216359055: [constants.EXPLOSM_WEBHOOK, 1299165855493390367],
1306414795049926676: [constants.EXPLOSM_WEBHOOK2, 1306416492304138364],
},
'comic_xkcd': {
1298729744216359055: [constants.XKCD_WEBHOOK, 1299165928755433483],
1306414795049926676: [constants.XKCD_WEBHOOK2, 1306416681991798854],
},
'comic_smbc': {
1298729744216359055: [constants.SMBC_WEBHOOK, 1299166071038808104],
1306414795049926676: [constants.SMBC_WEBHOOK2, 1306416842511745024],
},
'comic_qc': {
1298729744216359055: [constants.QC_WEBHOOK, 1299392115364593674],
1306414795049926676: [constants.QC_WEBHOOK2, 1306417084774744114],
},
'comic_dino': {
1298729744216359055: [constants.DINO_WEBHOOK, 1299771918886506557],
1306414795049926676: [constants.DINO_WEBHOOK2, 1306417286713704548],
}
}
self.NO_THREAD_WEBHOOKS: dict[str, list] = {
'theonion': [constants.ONION_WEBHOOK, constants.ONION_WEBHOOK2],
'thn': [constants.THN_WEBHOOK],
'memes': [constants.MEME_WEBHOOK1, constants.MEME_WEBHOOK2],
}
global BOT_CHANIDS
BOT_CHANIDS = self.bot.BOT_CHANIDS # Inherit
self.meme_stream_loop.start()
self.explosm_loop.start()
self.update_meme_lb.start()
asyncio.get_event_loop().create_task(self.init_meme_leaderboard())
def is_spamchan() -> bool: # type: ignore
"""Check if channel is spamchan"""
def predicate(ctx):
try:
if not ctx.channel.id in BOT_CHANIDS:
logging.debug("%s not found in %s", ctx.channel.id, BOT_CHANIDS)
return ctx.channel.id in BOT_CHANIDS
except:
traceback.print_exc()
return False
return commands.check(predicate) # type: ignore
async def leaderboard_increment(self,
uid: int) -> None:
"""
Increment leaderboard for uid
Args:
uid (int):
Returns:
None
"""
if not uid in self.meme_leaderboard:
self.meme_leaderboard[uid] = 1
else:
self.meme_leaderboard[uid] += 1
async with sqlite3.connect(self.stats_db_path, timeout=2) as db_conn:
"""Attempts both insert/update"""
query_1: str = "UPDATE memes SET count = count + 1 WHERE discord_uid = ?"
query_1_params: tuple = (uid,)
query_2: str = "INSERT INTO memes (discord_uid, count) VALUES (?, ?)"
query_2_params: tuple = (uid, self.meme_leaderboard[uid])
await db_conn.execute(query_1, query_1_params)
await db_conn.execute(query_2, query_2_params)
await db_conn.commit()
async def init_meme_leaderboard(self) -> None:
"""
INIT MEME LEADERBOARD
"""
self.meme_leaderboard: dict [int, int] = {}
async with sqlite3.connect(self.stats_db_path, timeout=2) as db_conn:
db_conn.row_factory = sqlite3.Row
db_query: str = "SELECT discord_uid, count FROM memes WHERE count > 0"
async with db_conn.execute(db_query) as db_cursor:
results = await db_cursor.fetchall()
for result in results:
uid = result['discord_uid']
count = result['count']
self.meme_leaderboard[uid] = count
@commands.Cog.listener()
async def on_ready(self) -> None:
"""Run on Bot Ready"""
await self.init_meme_leaderboard()
async def do_autos(self, only_comics: Optional[bool] = False) -> None:
"""
Run Auto Posters
Args:
only_comics (Optional[bool]): default False
Returns:
None
"""
try:
meme_grabber = memeg.MemeGrabber()
explosm_grabber = explosmg.ExplosmGrabber()
xkcd_grabber = xkcdg.XKCDGrabber()
smbc_grabber = smbcg.SMBCGrabber()
qc_grabber = qcg.QCGrabber()
dino_grabber = dinog.DinosaurGrabber()
onion_grabber = oniong.OnionGrabber()
thn_grabber = thng.THNGrabber()
explosm_comics: list[Optional[tuple]] = []
xkcd_comics: list[Optional[tuple]] = []
smbc_comics: list[Optional[tuple]] = []
dino_comics: list[Optional[tuple]] = []
onions: list[Optional[tuple]] = []
thns: list[Optional[tuple]] = []
memes: list[Optional[tuple]] = await meme_grabber.get()
try:
try:
explosm_comics = await explosm_grabber.get()
except:
pass
try:
xkcd_comics = await xkcd_grabber.get()
except:
pass
try:
smbc_comics = await smbc_grabber.get()
except:
pass
try:
qc_comics = await qc_grabber.get()
print(f"QC: {qc_comics}")
except:
pass
try:
dino_comics = await dino_grabber.get()
except Exception as e:
logging.debug("Dino failed: %s", str(e))
pass
try:
onions = await onion_grabber.get()
except Exception as e:
logging.debug("Onion failed: %s", str(e))
pass
try:
thns = await thn_grabber.get()
except Exception as e:
logging.debug("THNs failed: %s", str(e))
pass
except:
traceback.print_exc()
agents: list[str] = constants.HTTP_UA_LIST
headers: dict = {
'User-Agent': random.choice(agents)
}
if not only_comics:
try:
for meme in memes:
if not meme:
continue
(meme_id, meme_title, meme_url) = meme
request = requests.get(meme_url, stream=True, timeout=(5, 30), headers=headers)
if not request.status_code == 200:
continue
meme_content: bytes = request.raw.read()
for meme_hook in self.NO_THREAD_WEBHOOKS.get('memes', {}):
meme_image: io.BytesIO = io.BytesIO(meme_content)
ext: str = meme_url.split(".")[-1]\
.split("?")[0].split("&")[0]
async with ClientSession() as session:
webhook: discord.Webhook = discord.Webhook.from_url(meme_hook,
session=session)
await webhook.send(file=discord.File(meme_image,
filename=f'img.{ext}'), username="r/memes")
await asyncio.sleep(2)
except:
pass
try:
for comic in explosm_comics:
if not comic:
continue
(comic_title, comic_url) = comic
comic_title = discord.utils.escape_markdown(comic_title)
comic_request = requests.get(comic_url, stream=True, timeout=(5, 20), headers=headers)
comic_request.raise_for_status()
comic_content: bytes = comic_request.raw.read()
ext = comic_url.split(".")[-1]\
.split("?")[0].split("&")[0]
async with ClientSession() as session:
for chanid, _hook in self.THREADS.get('comic_explosm', {}).items():
comic_image: io.BytesIO = io.BytesIO(comic_content)
channel: int = chanid
(hook_uri, thread_id) = _hook
webhook = discord.Webhook.from_url(hook_uri,
session=session)
_channel: Any = self.bot.get_channel(channel)
if not _channel:
return
thread = _channel.get_thread(thread_id)
await webhook.send(f"**{comic_title}**", file=discord.File(comic_image, filename=f'img.{ext}'),
username="Cyanide & Happiness", thread=thread)
await asyncio.sleep(2)
except:
pass
try:
for comic in xkcd_comics:
if not comic:
continue
(comic_title, comic_url) = comic
comic_title = discord.utils.escape_markdown(comic_title)
comic_request = requests.get(comic_url, stream=True, timeout=(5, 20), headers=headers)
comic_request.raise_for_status()
comic_content = comic_request.raw.read()
comic_image = io.BytesIO(comic_request.raw.read())
ext = comic_url.split(".")[-1]\
.split("?")[0].split("&")[0]
async with ClientSession() as session:
for chanid, _hook in self.THREADS.get('comic_xkcd', {}).items():
comic_image = io.BytesIO(comic_content)
channel = chanid
(hook_uri, thread_id) = _hook
webhook = discord.Webhook.from_url(hook_uri,
session=session)
_channel = self.bot.get_channel(channel)
if not _channel:
return
thread = _channel.get_thread(thread_id)
await webhook.send(f"**{comic_title}**", file=discord.File(comic_image, filename=f'img.{ext}'),
username="xkcd", thread=thread)
await asyncio.sleep(2)
except:
pass
try:
for comic in smbc_comics:
if not comic:
continue
(comic_title, comic_url) = comic
comic_title = discord.utils.escape_markdown(comic_title)
comic_request = requests.get(comic_url, stream=True, timeout=(5, 20), headers=headers)
comic_request.raise_for_status()
comic_content = comic_request.raw.read()
ext = comic_url.split(".")[-1]\
.split("?")[0].split("&")[0]
async with ClientSession() as session:
for chanid, _hook in self.THREADS.get('comic_smbc', {}).items():
comic_image = io.BytesIO(comic_content)
channel = chanid
(hook_uri, thread_id) = _hook
webhook = discord.Webhook.from_url(hook_uri,
session=session)
_channel = self.bot.get_channel(channel)
if not _channel:
return
thread = _channel.get_thread(thread_id)
await webhook.send(f"**{comic_title}**", file=discord.File(comic_image, filename=f'img.{ext}'),
username="SMBC", thread=thread)
await asyncio.sleep(2)
except:
pass
try:
for comic in qc_comics:
logging.debug("Trying QC...")
if not comic:
continue
(comic_title, comic_url) = comic
comic_title = discord.utils.escape_markdown(comic_title)
comic_url = regex.sub(r'^http://ww\.', 'http://www.',
comic_url)
comic_url = regex.sub(r'\.pmg$', '.png',
comic_url)
comic_request = requests.get(comic_url, stream=True,
timeout=(5, 20), headers=headers)
comic_request.raise_for_status()
comic_content = comic_request.raw.read()
ext = comic_url.split(".")[-1]\
.split("?")[0].split("&")[0]
async with ClientSession() as session:
for chanid, _hook in self.THREADS.get('comic_qc', {}).items():
comic_image = io.BytesIO(comic_content)
channel = chanid
(hook_uri, thread_id) = _hook
webhook = discord.Webhook.from_url(hook_uri,
session=session)
_channel = self.bot.get_channel(channel)
if not _channel:
return
thread = _channel.get_thread(thread_id)
await webhook.send(f"**{comic_title}**", file=discord.File(comic_image, filename=f'img.{ext}'),
username="Questionable Content", thread=thread)
await asyncio.sleep(2)
except:
traceback.print_exc()
pass
try:
for comic in dino_comics:
if not comic:
continue
(comic_title, comic_url) = comic
comic_title = discord.utils.escape_markdown(comic_title)
comic_request = requests.get(comic_url, stream=True, timeout=(5, 20), headers=headers)
comic_request.raise_for_status()
comic_content = comic_request.raw.read()
ext = comic_url.split(".")[-1]\
.split("?")[0].split("&")[0]
async with ClientSession() as session:
for chanid, _hook in self.THREADS.get('comic_dino', {}).items():
comic_image = io.BytesIO(comic_content)
channel = chanid
(hook_uri, thread_id) = _hook
webhook = discord.Webhook.from_url(hook_uri,
session=session)
_channel = self.bot.get_channel(channel)
if not _channel:
return
thread = _channel.get_thread(thread_id)
await webhook.send(f"**{comic_title}**", file=discord.File(comic_image, filename=f'img.{ext}'),
username="Dinosaur Comics", thread=thread)
await asyncio.sleep(2)
except:
pass
try:
for onion in onions:
if not onion:
continue
(onion_title, onion_description, onion_link, onion_video) = onion
onion_description = textwrap.wrap(text=onion_description,
width=860, max_lines=1)[0]
embed: discord.Embed = discord.Embed(title=onion_title)
embed.add_field(name="Content", value=f"{onion_description[0:960]}\n-# {onion_link}")
async with ClientSession() as session:
for hook in self.NO_THREAD_WEBHOOKS.get('theonion', {}):
hook_uri = hook
webhook = discord.Webhook.from_url(hook_uri,
session=session)
await webhook.send(embed=embed, username="The Onion")
if onion_video:
await webhook.send(f"^ video: {onion_video}")
await asyncio.sleep(2)
except:
pass
try:
for thn in thns:
logging.debug("Trying thn...")
if not thn:
continue
(thn_title, thn_description, thn_link, thn_pubdate, thn_video) = thn
thn_description = textwrap.wrap(text=thn_description,
width=860, max_lines=1)[0]
embed = discord.Embed(title=thn_title)
embed.add_field(name="Content", value=f"{thn_description[0:960]}\n-# {thn_link}")
embed.add_field(name="Published", value=thn_pubdate, inline=False)
async with ClientSession() as session:
for hook in self.NO_THREAD_WEBHOOKS.get('thn', {}):
hook_uri = hook
webhook = discord.Webhook.from_url(hook_uri,
session=session)
await webhook.send(embed=embed, username="The Hacker News")
if thn_video:
await webhook.send(f"^ video: {thn_video}")
await asyncio.sleep(2)
except:
pass
except:
# await self.bot.get_channel(self.MEMESTREAM_CHANID).send(f"FUCK, MY MEEMER! YOU DENTED MY MEEMER!")
traceback.print_exc()
@tasks.loop(hours=12.0)
async def meme_stream_loop(self) -> None:
"""Meme Stream Loop (r/memes)"""
try:
await asyncio.sleep(10) # Try to ensure we are ready first
self.meme_counter += 1
if self.meme_counter == 1:
return await self.do_autos(only_comics=True) # Skip first iteration!
await self.do_autos()
except:
traceback.print_exc()
@tasks.loop(hours=0.5)
async def explosm_loop(self) -> None:
"""Comic Loop"""
try:
await asyncio.sleep(10) # Try to ensure we are ready first
await self.do_autos(only_comics=True)
except:
traceback.print_exc()
@bridge.bridge_command() # type: ignore
@is_spamchan()
async def meme(self, ctx) -> None:
"""Create Meme"""
await ctx.respond(view=MemeView())
@bridge.bridge_command(hidden=True)
@commands.is_owner()
async def domemestream(self, ctx) -> None:
"""Run Meme Stream Auto Post"""
try:
await ctx.respond("Trying!", ephemeral=True)
await self.do_autos()
except:
await ctx.respond("Fuck! :(", ephemeral=True)
traceback.print_exc()
@bridge.bridge_command(hidden=True)
@commands.is_owner()
async def doexplosm(self, ctx) -> None:
"""Run Comic Auto Posters"""
try:
await ctx.respond("Trying!", ephemeral=True)
await self.do_autos(only_comics=True)
except:
await ctx.respond("Fuck! :(", ephemeral=True)
traceback.print_exc()
@commands.Cog.listener()
async def on_message(self, message: discord.Message) -> None:
"""
Message hook, to monitor for memes
Also monitors for messages to #memes-top-10 to autodelete, only Havoc may post in #memes-top-10!
"""
lb_chanid: int = 1352373745108652145
if not self.bot.user: # No valid client instance
return
if not isinstance(message.channel, discord.TextChannel):
return
if message.channel.id == lb_chanid\
and not message.author.id == self.bot.user.id:
"""Message to #memes-top-10 not by Havoc, delete it"""
await message.delete(reason=f"Messages to #{message.channel.name} are not allowed")
removal_embed: discord.Embed = discord.Embed(
title="Message Deleted",
description=f"Your message to **#{message.channel.name}** has been automatically deleted.\n**Reason**: Messages to this channel by users is not allowed."
)
await message.author.send(embed=removal_embed)
if message.author.id == self.bot.user.id: # Bots own message
return
if not message.guild:
return
if not message.channel.id == 1147229098544988261: # Not meme channel
return
if not message.attachments: # No attachments to consider a meme
return
await self.leaderboard_increment(message.author.id)
async def get_top(self, n: int = 10) -> Optional[list[tuple]]:
"""
Get top (n=10) Memes
Args:
n (int): Number of top results to return, default 10
Returns:
Optional[dict]
"""
try:
out_top: list[tuple[int, int]] = []
async with sqlite3.connect(self.stats_db_path, timeout=2) as db_conn:
db_conn.row_factory = sqlite3.Row
query: str = "SELECT discord_uid, count FROM memes WHERE count > 0 ORDER BY count DESC"
async with db_conn.execute(query) as db_cursor:
db_result = await db_cursor.fetchall()
for res in db_result:
uid = res['discord_uid']
count = res['count']
out_top.append((uid, count))
# Check for and remove missing members
guild_id: int = 1145182936002482196
guild: Optional[discord.Guild] = self.bot.get_guild(guild_id)
if not guild:
return None
for x, entry in enumerate(out_top):
(uid, _) = entry
member: Optional[discord.Member] = guild.get_member(uid)
if not member:
out_top.pop(x)
return out_top[0:(n+1)]
except:
traceback.print_exc()
return None
async def get_top_embed(self, n:int = 10) -> Optional[discord.Embed]:
"""
Get Top Memes Embed
Args:
n (int): Number of top results to return, default 10
Returns:
Optional[discord.Embed]
"""
guild_id: int = 1145182936002482196
guild: Optional[discord.Guild] = self.bot.get_guild(guild_id)
if not guild:
return None
top: Optional[list[tuple]] = await self.get_top(n)
if not top:
return None
top_formatted: str = ""
for x, item in enumerate(top):
(uid, count) = item
member: Optional[discord.Member] = guild.get_member(uid)
if not member:
continue
display_name: str = member.display_name
top_formatted += f"{x+1}. **{discord.utils.escape_markdown(display_name)}**: *{count}*\n"
top_formatted = top_formatted.strip()
embed: discord.Embed = discord.Embed(title=f"Top {n} Memes",
description=top_formatted,
colour=0xff00ff)
return embed
@tasks.loop(seconds=30, reconnect=True)
async def update_meme_lb(self) -> None:
"""Update the Meme Leaderboard"""
try:
lb_chanid: int = 1352373745108652145
message_id: int = 1352440888231723070
top_embed = await self.get_top_embed(n=10)
channel = self.bot.get_channel(lb_chanid)
if not isinstance(channel, discord.TextChannel):
return
message_to_edit = await channel.fetch_message(message_id)
await message_to_edit.edit(embed=top_embed,
content="## This message will automatically update periodically.")
except:
traceback.print_exc()
@bridge.bridge_command(hidden=True)
@commands.is_owner()
async def doembed(self, ctx) -> None:
"""Do Meme Embed"""
meme_lb_chan_id: int = 1352373745108652145
meme_lb_chan: Union[discord.TextChannel, Any] = self.bot.get_channel(meme_lb_chan_id)
embed = await self.get_top_embed()
if embed:
await meme_lb_chan.send(embed=embed)
else:
await ctx.respond("NO embed :(")
def cog_unload(self) -> None:
self.meme_stream_loop.cancel()
self.explosm_loop.cancel()
self.update_meme_lb.cancel()
def setup(bot) -> None:
"""Run on Cog Load"""
bot.add_cog(Meme(bot))