2025-02-13 14:51:35 -05:00
import os
import traceback
import json
import io
import asyncio
import random
2025-03-20 20:49:23 -04:00
from typing import ( LiteralString ,
Optional ,
Any ,
Union )
import aiosqlite as sqlite3
2025-02-13 14:51:35 -05:00
import logging
import textwrap
import regex
import requests
import discord
2025-02-15 08:36:45 -05:00
from disc_havoc import Havoc
2025-02-13 14:51:35 -05:00
from aiohttp import ClientSession
from discord . ext import bridge , commands , tasks
2025-02-16 13:20:53 -05:00
from util . jesusmemes import JesusMemeGenerator
2025-02-13 14:51:35 -05:00
import scrapers . reddit_scrape as memeg
import scrapers . explosm_scrape as explosmg
import scrapers . xkcd_scrape as xkcdg
import scrapers . smbc_scrape as smbcg
import scrapers . qc_scrape as qcg
import scrapers . dinosaur_scrape as dinog
import scrapers . onion_scrape as oniong
import scrapers . thn_scrape as thng
import constants
meme_choices = [ ]
BOT_CHANIDS = [ ]
2025-03-20 20:49:23 -04:00
"""
TODO : Cleanup new meme leaderboard stuff
"""
2025-02-13 14:51:35 -05:00
class Helper :
""" Meme Helper """
def load_meme_choices ( self ) - > None :
""" Load Available Meme Templates from JSON File """
global meme_choices
memes_file : str | LiteralString = os . path . join ( os . path . dirname ( __file__ ) , " memes.json " )
with open ( memes_file , ' r ' , encoding = ' utf-8 ' ) as f :
meme_choices = json . loads ( f . read ( ) )
class MemeView ( discord . ui . View ) :
""" Meme Selection discord.ui.View """
helper = Helper ( )
helper . load_meme_choices ( )
@discord.ui.select (
placeholder = " Choose a Meme! " ,
min_values = 1 ,
max_values = 1 ,
options = [
discord . SelectOption (
label = meme_label
) for meme_label in meme_choices [ 0 : 24 ]
]
)
async def select_callback ( self , select : discord . ui . Select ,
interaction : discord . Interaction ) - > None :
""" Meme Selection Callback """
2025-02-15 08:36:45 -05:00
if not isinstance ( select . values [ 0 ] , str ) :
return
2025-02-13 14:51:35 -05:00
modal : discord . ui . Modal = MemeModal ( meme = select . values [ 0 ] , title = " Meme Selected " )
await interaction . response . send_modal ( modal )
class MemeModal ( discord . ui . Modal ) :
""" Meme Creation discord.ui.Modal """
def __init__ ( self , * args , meme : Optional [ str ] = None , * * kwargs ) - > None :
super ( ) . __init__ ( * args , * * kwargs )
2025-02-15 08:36:45 -05:00
self . selected_meme : Optional [ str ] = meme
2025-02-13 14:51:35 -05:00
self . meme_generator = JesusMemeGenerator ( )
self . TEXT_LIMIT : int = 80
self . add_item ( discord . ui . InputText ( label = " Top Text " ,
style = discord . InputTextStyle . singleline ) )
self . add_item ( discord . ui . InputText ( label = " Bottom Text " ,
style = discord . InputTextStyle . singleline ) )
async def callback ( self , interaction : discord . Interaction ) - > None :
2025-02-15 08:36:45 -05:00
if not self . selected_meme : # No meme selected
return
2025-02-13 14:51:35 -05:00
selected_meme : str = self . selected_meme
2025-02-15 08:36:45 -05:00
if not self . children or len ( self . children ) < 2 : # Invalid request
return
if not isinstance ( self . children [ 0 ] . value , str ) \
or not isinstance ( self . children [ 1 ] . value , str ) : # Invalid request
return
2025-02-13 14:51:35 -05:00
meme_top_line : str = self . children [ 0 ] . value . strip ( )
meme_bottom_line : str = self . children [ 1 ] . value . strip ( )
if len ( meme_top_line ) > self . TEXT_LIMIT or len ( meme_bottom_line ) > self . TEXT_LIMIT :
await interaction . response . send_message ( " ERR: Text is limited to 80 characters for each the top and bottom lines. " )
return
2025-02-16 20:07:02 -05:00
meme_link : Optional [ str ] = await self . meme_generator . create_meme ( top_line = meme_top_line ,
2025-02-13 15:48:39 -05:00
bottom_line = meme_bottom_line , meme = selected_meme )
2025-02-16 20:07:02 -05:00
if not meme_link :
await interaction . response . send_message ( " Failed! " )
return
2025-02-13 14:51:35 -05:00
embed : discord . Embed = discord . Embed ( title = " Generated Meme " )
embed . set_image ( url = meme_link )
2025-02-15 08:36:45 -05:00
embed . add_field ( name = " Meme " , value = selected_meme , inline = True )
2025-02-13 14:51:35 -05:00
await interaction . response . send_message ( embeds = [ embed ] )
return
class Meme ( commands . Cog ) :
""" Meme Cog for Havoc """
2025-02-15 08:36:45 -05:00
def __init__ ( self , bot : Havoc ) - > None :
self . bot : Havoc = bot
2025-03-20 20:49:23 -04:00
self . stats_db_path : LiteralString = os . path . join ( " /usr/local/share " ,
" sqlite_dbs " , " stats.db " )
2025-02-13 14:51:35 -05:00
self . meme_choices : list = [ ]
self . meme_counter : int = 0
2025-02-15 08:36:45 -05:00
self . THREADS : dict [ str , dict [ int , list ] ] = {
2025-02-13 14:51:35 -05:00
# Format: Guild1: [ChanId : [Webhook, ThreadId], Guild2: [ChanId : [Webhook, ThreadId]
' comic_explosm ' : {
1298729744216359055 : [ constants . EXPLOSM_WEBHOOK , 1299165855493390367 ] ,
1306414795049926676 : [ constants . EXPLOSM_WEBHOOK2 , 1306416492304138364 ] ,
} ,
' comic_xkcd ' : {
1298729744216359055 : [ constants . XKCD_WEBHOOK , 1299165928755433483 ] ,
1306414795049926676 : [ constants . XKCD_WEBHOOK2 , 1306416681991798854 ] ,
} ,
' comic_smbc ' : {
1298729744216359055 : [ constants . SMBC_WEBHOOK , 1299166071038808104 ] ,
1306414795049926676 : [ constants . SMBC_WEBHOOK2 , 1306416842511745024 ] ,
} ,
' comic_qc ' : {
1298729744216359055 : [ constants . QC_WEBHOOK , 1299392115364593674 ] ,
1306414795049926676 : [ constants . QC_WEBHOOK2 , 1306417084774744114 ] ,
} ,
' comic_dino ' : {
1298729744216359055 : [ constants . DINO_WEBHOOK , 1299771918886506557 ] ,
1306414795049926676 : [ constants . DINO_WEBHOOK2 , 1306417286713704548 ] ,
}
}
2025-02-15 08:36:45 -05:00
self . NO_THREAD_WEBHOOKS : dict [ str , list ] = {
2025-02-13 14:51:35 -05:00
' theonion ' : [ constants . ONION_WEBHOOK , constants . ONION_WEBHOOK2 ] ,
' thn ' : [ constants . THN_WEBHOOK ] ,
' memes ' : [ constants . MEME_WEBHOOK1 , constants . MEME_WEBHOOK2 ] ,
}
global BOT_CHANIDS
BOT_CHANIDS = self . bot . BOT_CHANIDS # Inherit
self . meme_stream_loop . start ( )
self . explosm_loop . start ( )
2025-03-20 20:49:23 -04:00
self . update_meme_lb . start ( )
asyncio . get_event_loop ( ) . create_task ( self . init_meme_leaderboard ( ) )
2025-02-15 08:36:45 -05:00
def is_spamchan ( ) - > bool : # type: ignore
2025-02-13 14:51:35 -05:00
""" Check if channel is spamchan """
def predicate ( ctx ) :
try :
if not ctx . channel . id in BOT_CHANIDS :
logging . debug ( " %s not found in %s " , ctx . channel . id , BOT_CHANIDS )
return ctx . channel . id in BOT_CHANIDS
except :
traceback . print_exc ( )
return False
2025-02-15 08:36:45 -05:00
return commands . check ( predicate ) # type: ignore
2025-02-13 14:51:35 -05:00
2025-03-20 20:49:23 -04:00
async def leaderboard_increment ( self ,
uid : int ) - > None :
"""
Increment leaderboard for uid
Args :
uid ( int ) :
Returns :
None
"""
if not uid in self . meme_leaderboard :
self . meme_leaderboard [ uid ] = 1
else :
self . meme_leaderboard [ uid ] + = 1
async with sqlite3 . connect ( self . stats_db_path , timeout = 2 ) as db_conn :
2025-03-20 20:56:26 -04:00
""" Attempts both insert/update """
query_1 : str = " UPDATE memes SET count = count + 1 WHERE discord_uid = ? "
query_1_params : tuple = ( uid , )
query_2 : str = " INSERT INTO memes (discord_uid, count) VALUES (?, ?) "
query_2_params : tuple = ( uid , self . meme_leaderboard [ uid ] )
2025-03-21 16:05:12 -04:00
try :
await db_conn . execute ( query_1 , query_1_params )
except :
pass
try :
await db_conn . execute ( query_2 , query_2_params )
except :
pass
2025-03-20 20:56:26 -04:00
await db_conn . commit ( )
2025-03-25 06:25:04 -04:00
try :
await self . update_meme_lb ( )
except Exception as e :
logging . info ( " Failed to update meme leaderboard following increment: %s " ,
str ( e ) )
2025-03-20 20:49:23 -04:00
async def init_meme_leaderboard ( self ) - > None :
"""
INIT MEME LEADERBOARD
"""
self . meme_leaderboard : dict [ int , int ] = { }
async with sqlite3 . connect ( self . stats_db_path , timeout = 2 ) as db_conn :
db_conn . row_factory = sqlite3 . Row
db_query : str = " SELECT discord_uid, count FROM memes WHERE count > 0 "
async with db_conn . execute ( db_query ) as db_cursor :
results = await db_cursor . fetchall ( )
for result in results :
uid = result [ ' discord_uid ' ]
count = result [ ' count ' ]
self . meme_leaderboard [ uid ] = count
2025-02-13 14:51:35 -05:00
2025-03-20 20:49:23 -04:00
@commands.Cog.listener ( )
async def on_ready ( self ) - > None :
""" Run on Bot Ready """
await self . init_meme_leaderboard ( )
2025-02-13 14:51:35 -05:00
async def do_autos ( self , only_comics : Optional [ bool ] = False ) - > None :
"""
Run Auto Posters
2025-02-16 20:07:02 -05:00
2025-02-13 14:51:35 -05:00
Args :
only_comics ( Optional [ bool ] ) : default False
Returns :
None
"""
try :
meme_grabber = memeg . MemeGrabber ( )
explosm_grabber = explosmg . ExplosmGrabber ( )
xkcd_grabber = xkcdg . XKCDGrabber ( )
smbc_grabber = smbcg . SMBCGrabber ( )
qc_grabber = qcg . QCGrabber ( )
dino_grabber = dinog . DinosaurGrabber ( )
onion_grabber = oniong . OnionGrabber ( )
thn_grabber = thng . THNGrabber ( )
2025-02-15 08:36:45 -05:00
explosm_comics : list [ Optional [ tuple ] ] = [ ]
xkcd_comics : list [ Optional [ tuple ] ] = [ ]
smbc_comics : list [ Optional [ tuple ] ] = [ ]
dino_comics : list [ Optional [ tuple ] ] = [ ]
onions : list [ Optional [ tuple ] ] = [ ]
thns : list [ Optional [ tuple ] ] = [ ]
memes : list [ Optional [ tuple ] ] = await meme_grabber . get ( )
2025-02-13 14:51:35 -05:00
try :
try :
2025-02-15 08:36:45 -05:00
explosm_comics = await explosm_grabber . get ( )
2025-02-13 14:51:35 -05:00
except :
pass
try :
2025-02-15 08:36:45 -05:00
xkcd_comics = await xkcd_grabber . get ( )
2025-02-13 14:51:35 -05:00
except :
pass
try :
2025-02-15 08:36:45 -05:00
smbc_comics = await smbc_grabber . get ( )
2025-02-13 14:51:35 -05:00
except :
pass
try :
2025-02-15 08:36:45 -05:00
qc_comics = await qc_grabber . get ( )
2025-02-13 14:51:35 -05:00
print ( f " QC: { qc_comics } " )
except :
pass
try :
2025-02-15 08:36:45 -05:00
dino_comics = await dino_grabber . get ( )
2025-02-13 14:51:35 -05:00
except Exception as e :
logging . debug ( " Dino failed: %s " , str ( e ) )
pass
try :
2025-02-15 08:36:45 -05:00
onions = await onion_grabber . get ( )
2025-02-13 14:51:35 -05:00
except Exception as e :
logging . debug ( " Onion failed: %s " , str ( e ) )
pass
try :
2025-02-15 08:36:45 -05:00
thns = await thn_grabber . get ( )
2025-02-13 14:51:35 -05:00
except Exception as e :
logging . debug ( " THNs failed: %s " , str ( e ) )
pass
except :
traceback . print_exc ( )
agents : list [ str ] = constants . HTTP_UA_LIST
headers : dict = {
' User-Agent ' : random . choice ( agents )
}
if not only_comics :
try :
for meme in memes :
2025-02-15 08:36:45 -05:00
if not meme :
continue
2025-03-12 09:37:44 -04:00
( meme_id , meme_title , meme_url ) = meme
2025-02-13 14:51:35 -05:00
request = requests . get ( meme_url , stream = True , timeout = ( 5 , 30 ) , headers = headers )
if not request . status_code == 200 :
continue
meme_content : bytes = request . raw . read ( )
2025-02-15 08:36:45 -05:00
for meme_hook in self . NO_THREAD_WEBHOOKS . get ( ' memes ' , { } ) :
2025-02-13 14:51:35 -05:00
meme_image : io . BytesIO = io . BytesIO ( meme_content )
ext : str = meme_url . split ( " . " ) [ - 1 ] \
. split ( " ? " ) [ 0 ] . split ( " & " ) [ 0 ]
async with ClientSession ( ) as session :
webhook : discord . Webhook = discord . Webhook . from_url ( meme_hook ,
session = session )
await webhook . send ( file = discord . File ( meme_image ,
filename = f ' img. { ext } ' ) , username = " r/memes " )
await asyncio . sleep ( 2 )
except :
pass
try :
for comic in explosm_comics :
2025-02-15 08:36:45 -05:00
if not comic :
continue
2025-02-13 14:51:35 -05:00
( comic_title , comic_url ) = comic
2025-02-15 08:36:45 -05:00
comic_title = discord . utils . escape_markdown ( comic_title )
2025-02-13 14:51:35 -05:00
comic_request = requests . get ( comic_url , stream = True , timeout = ( 5 , 20 ) , headers = headers )
comic_request . raise_for_status ( )
comic_content : bytes = comic_request . raw . read ( )
2025-02-15 08:36:45 -05:00
ext = comic_url . split ( " . " ) [ - 1 ] \
2025-02-13 14:51:35 -05:00
. split ( " ? " ) [ 0 ] . split ( " & " ) [ 0 ]
async with ClientSession ( ) as session :
2025-02-15 08:36:45 -05:00
for chanid , _hook in self . THREADS . get ( ' comic_explosm ' , { } ) . items ( ) :
2025-02-13 14:51:35 -05:00
comic_image : io . BytesIO = io . BytesIO ( comic_content )
channel : int = chanid
2025-02-15 08:36:45 -05:00
( hook_uri , thread_id ) = _hook
webhook = discord . Webhook . from_url ( hook_uri ,
2025-02-13 14:51:35 -05:00
session = session )
2025-02-15 08:36:45 -05:00
_channel : Any = self . bot . get_channel ( channel )
if not _channel :
return
thread = _channel . get_thread ( thread_id )
2025-02-13 14:51:35 -05:00
await webhook . send ( f " ** { comic_title } ** " , file = discord . File ( comic_image , filename = f ' img. { ext } ' ) ,
username = " Cyanide & Happiness " , thread = thread )
await asyncio . sleep ( 2 )
except :
pass
try :
for comic in xkcd_comics :
2025-02-15 08:36:45 -05:00
if not comic :
continue
2025-02-13 14:51:35 -05:00
( comic_title , comic_url ) = comic
2025-02-15 08:36:45 -05:00
comic_title = discord . utils . escape_markdown ( comic_title )
2025-02-13 14:51:35 -05:00
comic_request = requests . get ( comic_url , stream = True , timeout = ( 5 , 20 ) , headers = headers )
comic_request . raise_for_status ( )
2025-02-15 08:36:45 -05:00
comic_content = comic_request . raw . read ( )
comic_image = io . BytesIO ( comic_request . raw . read ( ) )
ext = comic_url . split ( " . " ) [ - 1 ] \
2025-02-13 14:51:35 -05:00
. split ( " ? " ) [ 0 ] . split ( " & " ) [ 0 ]
async with ClientSession ( ) as session :
2025-02-15 08:36:45 -05:00
for chanid , _hook in self . THREADS . get ( ' comic_xkcd ' , { } ) . items ( ) :
comic_image = io . BytesIO ( comic_content )
channel = chanid
( hook_uri , thread_id ) = _hook
webhook = discord . Webhook . from_url ( hook_uri ,
2025-02-13 14:51:35 -05:00
session = session )
2025-02-15 08:36:45 -05:00
_channel = self . bot . get_channel ( channel )
if not _channel :
return
thread = _channel . get_thread ( thread_id )
2025-02-13 14:51:35 -05:00
await webhook . send ( f " ** { comic_title } ** " , file = discord . File ( comic_image , filename = f ' img. { ext } ' ) ,
username = " xkcd " , thread = thread )
await asyncio . sleep ( 2 )
except :
pass
try :
for comic in smbc_comics :
2025-02-15 08:36:45 -05:00
if not comic :
continue
2025-02-13 14:51:35 -05:00
( comic_title , comic_url ) = comic
2025-02-15 08:36:45 -05:00
comic_title = discord . utils . escape_markdown ( comic_title )
2025-02-13 14:51:35 -05:00
comic_request = requests . get ( comic_url , stream = True , timeout = ( 5 , 20 ) , headers = headers )
comic_request . raise_for_status ( )
2025-02-15 08:36:45 -05:00
comic_content = comic_request . raw . read ( )
ext = comic_url . split ( " . " ) [ - 1 ] \
2025-02-13 14:51:35 -05:00
. split ( " ? " ) [ 0 ] . split ( " & " ) [ 0 ]
async with ClientSession ( ) as session :
2025-02-15 08:36:45 -05:00
for chanid , _hook in self . THREADS . get ( ' comic_smbc ' , { } ) . items ( ) :
comic_image = io . BytesIO ( comic_content )
channel = chanid
( hook_uri , thread_id ) = _hook
webhook = discord . Webhook . from_url ( hook_uri ,
2025-02-13 14:51:35 -05:00
session = session )
2025-02-15 08:36:45 -05:00
_channel = self . bot . get_channel ( channel )
if not _channel :
return
thread = _channel . get_thread ( thread_id )
2025-02-13 14:51:35 -05:00
await webhook . send ( f " ** { comic_title } ** " , file = discord . File ( comic_image , filename = f ' img. { ext } ' ) ,
username = " SMBC " , thread = thread )
await asyncio . sleep ( 2 )
except :
pass
try :
for comic in qc_comics :
logging . debug ( " Trying QC... " )
2025-02-15 08:36:45 -05:00
if not comic :
continue
2025-02-13 14:51:35 -05:00
( comic_title , comic_url ) = comic
2025-02-15 08:36:45 -05:00
comic_title = discord . utils . escape_markdown ( comic_title )
comic_url = regex . sub ( r ' ^http://ww \ . ' , ' http://www. ' ,
2025-02-13 14:51:35 -05:00
comic_url )
2025-02-15 08:36:45 -05:00
comic_url = regex . sub ( r ' \ .pmg$ ' , ' .png ' ,
2025-02-13 14:51:35 -05:00
comic_url )
comic_request = requests . get ( comic_url , stream = True ,
timeout = ( 5 , 20 ) , headers = headers )
comic_request . raise_for_status ( )
2025-02-15 08:36:45 -05:00
comic_content = comic_request . raw . read ( )
ext = comic_url . split ( " . " ) [ - 1 ] \
2025-02-13 14:51:35 -05:00
. split ( " ? " ) [ 0 ] . split ( " & " ) [ 0 ]
async with ClientSession ( ) as session :
2025-02-15 08:36:45 -05:00
for chanid , _hook in self . THREADS . get ( ' comic_qc ' , { } ) . items ( ) :
comic_image = io . BytesIO ( comic_content )
channel = chanid
( hook_uri , thread_id ) = _hook
webhook = discord . Webhook . from_url ( hook_uri ,
2025-02-13 14:51:35 -05:00
session = session )
2025-02-15 08:36:45 -05:00
_channel = self . bot . get_channel ( channel )
if not _channel :
return
thread = _channel . get_thread ( thread_id )
2025-02-13 14:51:35 -05:00
await webhook . send ( f " ** { comic_title } ** " , file = discord . File ( comic_image , filename = f ' img. { ext } ' ) ,
username = " Questionable Content " , thread = thread )
await asyncio . sleep ( 2 )
except :
traceback . print_exc ( )
pass
try :
for comic in dino_comics :
2025-02-15 08:36:45 -05:00
if not comic :
continue
2025-02-13 14:51:35 -05:00
( comic_title , comic_url ) = comic
2025-02-15 08:36:45 -05:00
comic_title = discord . utils . escape_markdown ( comic_title )
2025-02-13 14:51:35 -05:00
comic_request = requests . get ( comic_url , stream = True , timeout = ( 5 , 20 ) , headers = headers )
comic_request . raise_for_status ( )
2025-02-15 08:36:45 -05:00
comic_content = comic_request . raw . read ( )
2025-02-13 14:51:35 -05:00
ext = comic_url . split ( " . " ) [ - 1 ] \
. split ( " ? " ) [ 0 ] . split ( " & " ) [ 0 ]
async with ClientSession ( ) as session :
2025-02-15 08:36:45 -05:00
for chanid , _hook in self . THREADS . get ( ' comic_dino ' , { } ) . items ( ) :
comic_image = io . BytesIO ( comic_content )
channel = chanid
( hook_uri , thread_id ) = _hook
webhook = discord . Webhook . from_url ( hook_uri ,
2025-02-13 14:51:35 -05:00
session = session )
2025-02-15 08:36:45 -05:00
_channel = self . bot . get_channel ( channel )
if not _channel :
return
thread = _channel . get_thread ( thread_id )
2025-02-13 14:51:35 -05:00
await webhook . send ( f " ** { comic_title } ** " , file = discord . File ( comic_image , filename = f ' img. { ext } ' ) ,
username = " Dinosaur Comics " , thread = thread )
await asyncio . sleep ( 2 )
except :
pass
try :
for onion in onions :
2025-02-15 08:36:45 -05:00
if not onion :
continue
2025-02-13 14:51:35 -05:00
( onion_title , onion_description , onion_link , onion_video ) = onion
2025-02-15 08:36:45 -05:00
onion_description = textwrap . wrap ( text = onion_description ,
2025-02-13 14:51:35 -05:00
width = 860 , max_lines = 1 ) [ 0 ]
embed : discord . Embed = discord . Embed ( title = onion_title )
embed . add_field ( name = " Content " , value = f " { onion_description [ 0 : 960 ] } \n -# { onion_link } " )
async with ClientSession ( ) as session :
2025-02-15 08:36:45 -05:00
for hook in self . NO_THREAD_WEBHOOKS . get ( ' theonion ' , { } ) :
hook_uri = hook
webhook = discord . Webhook . from_url ( hook_uri ,
2025-02-13 14:51:35 -05:00
session = session )
await webhook . send ( embed = embed , username = " The Onion " )
if onion_video :
await webhook . send ( f " ^ video: { onion_video } " )
await asyncio . sleep ( 2 )
except :
pass
try :
for thn in thns :
logging . debug ( " Trying thn... " )
2025-02-15 08:36:45 -05:00
if not thn :
continue
2025-02-13 14:51:35 -05:00
( thn_title , thn_description , thn_link , thn_pubdate , thn_video ) = thn
2025-02-15 08:36:45 -05:00
thn_description = textwrap . wrap ( text = thn_description ,
2025-02-13 14:51:35 -05:00
width = 860 , max_lines = 1 ) [ 0 ]
2025-02-15 08:36:45 -05:00
embed = discord . Embed ( title = thn_title )
2025-02-13 14:51:35 -05:00
embed . add_field ( name = " Content " , value = f " { thn_description [ 0 : 960 ] } \n -# { thn_link } " )
embed . add_field ( name = " Published " , value = thn_pubdate , inline = False )
async with ClientSession ( ) as session :
2025-02-15 08:36:45 -05:00
for hook in self . NO_THREAD_WEBHOOKS . get ( ' thn ' , { } ) :
hook_uri = hook
webhook = discord . Webhook . from_url ( hook_uri ,
2025-02-13 14:51:35 -05:00
session = session )
await webhook . send ( embed = embed , username = " The Hacker News " )
if thn_video :
await webhook . send ( f " ^ video: { thn_video } " )
await asyncio . sleep ( 2 )
except :
pass
except :
# await self.bot.get_channel(self.MEMESTREAM_CHANID).send(f"FUCK, MY MEEMER! YOU DENTED MY MEEMER!")
traceback . print_exc ( )
@tasks.loop ( hours = 12.0 )
async def meme_stream_loop ( self ) - > None :
""" Meme Stream Loop (r/memes) """
try :
await asyncio . sleep ( 10 ) # Try to ensure we are ready first
self . meme_counter + = 1
if self . meme_counter == 1 :
return await self . do_autos ( only_comics = True ) # Skip first iteration!
await self . do_autos ( )
except :
traceback . print_exc ( )
@tasks.loop ( hours = 0.5 )
async def explosm_loop ( self ) - > None :
""" Comic Loop """
try :
await asyncio . sleep ( 10 ) # Try to ensure we are ready first
await self . do_autos ( only_comics = True )
except :
traceback . print_exc ( )
2025-02-15 08:36:45 -05:00
@bridge.bridge_command ( ) # type: ignore
@is_spamchan ( )
async def meme ( self , ctx ) - > None :
2025-02-13 14:51:35 -05:00
""" Create Meme """
2025-02-15 08:36:45 -05:00
await ctx . respond ( view = MemeView ( ) )
2025-02-13 14:51:35 -05:00
@bridge.bridge_command ( hidden = True )
@commands.is_owner ( )
async def domemestream ( self , ctx ) - > None :
""" Run Meme Stream Auto Post """
try :
await ctx . respond ( " Trying! " , ephemeral = True )
await self . do_autos ( )
except :
await ctx . respond ( " Fuck! :( " , ephemeral = True )
traceback . print_exc ( )
@bridge.bridge_command ( hidden = True )
@commands.is_owner ( )
async def doexplosm ( self , ctx ) - > None :
""" Run Comic Auto Posters """
try :
await ctx . respond ( " Trying! " , ephemeral = True )
await self . do_autos ( only_comics = True )
except :
await ctx . respond ( " Fuck! :( " , ephemeral = True )
traceback . print_exc ( )
2025-03-20 20:49:23 -04:00
@commands.Cog.listener ( )
async def on_message ( self , message : discord . Message ) - > None :
"""
Message hook , to monitor for memes
Also monitors for messages to #memes-top-10 to autodelete, only Havoc may post in #memes-top-10!
"""
lb_chanid : int = 1352373745108652145
if not self . bot . user : # No valid client instance
return
if not isinstance ( message . channel , discord . TextChannel ) :
return
if message . channel . id == lb_chanid \
and not message . author . id == self . bot . user . id :
""" Message to #memes-top-10 not by Havoc, delete it """
await message . delete ( reason = f " Messages to # { message . channel . name } are not allowed " )
removal_embed : discord . Embed = discord . Embed (
title = " Message Deleted " ,
description = f " Your message to **# { message . channel . name } ** has been automatically deleted. \n **Reason**: Messages to this channel by users is not allowed. "
)
await message . author . send ( embed = removal_embed )
if message . author . id == self . bot . user . id : # Bots own message
return
if not message . guild :
return
if not message . channel . id == 1147229098544988261 : # Not meme channel
return
if not message . attachments : # No attachments to consider a meme
return
await self . leaderboard_increment ( message . author . id )
async def get_top ( self , n : int = 10 ) - > Optional [ list [ tuple ] ] :
"""
Get top ( n = 10 ) Memes
Args :
n ( int ) : Number of top results to return , default 10
Returns :
Optional [ dict ]
"""
try :
out_top : list [ tuple [ int , int ] ] = [ ]
async with sqlite3 . connect ( self . stats_db_path , timeout = 2 ) as db_conn :
db_conn . row_factory = sqlite3 . Row
query : str = " SELECT discord_uid, count FROM memes WHERE count > 0 ORDER BY count DESC "
async with db_conn . execute ( query ) as db_cursor :
db_result = await db_cursor . fetchall ( )
for res in db_result :
uid = res [ ' discord_uid ' ]
count = res [ ' count ' ]
out_top . append ( ( uid , count ) )
# Check for and remove missing members
guild_id : int = 1145182936002482196
guild : Optional [ discord . Guild ] = self . bot . get_guild ( guild_id )
if not guild :
return None
for x , entry in enumerate ( out_top ) :
( uid , _ ) = entry
member : Optional [ discord . Member ] = guild . get_member ( uid )
if not member :
out_top . pop ( x )
return out_top [ 0 : ( n + 1 ) ]
except :
traceback . print_exc ( )
return None
async def get_top_embed ( self , n : int = 10 ) - > Optional [ discord . Embed ] :
"""
Get Top Memes Embed
Args :
n ( int ) : Number of top results to return , default 10
Returns :
Optional [ discord . Embed ]
"""
guild_id : int = 1145182936002482196
guild : Optional [ discord . Guild ] = self . bot . get_guild ( guild_id )
if not guild :
return None
top : Optional [ list [ tuple ] ] = await self . get_top ( n )
if not top :
return None
top_formatted : str = " "
for x , item in enumerate ( top ) :
( uid , count ) = item
member : Optional [ discord . Member ] = guild . get_member ( uid )
if not member :
continue
display_name : str = member . display_name
top_formatted + = f " { x + 1 } . ** { discord . utils . escape_markdown ( display_name ) } **: * { count } * \n "
top_formatted = top_formatted . strip ( )
embed : discord . Embed = discord . Embed ( title = f " Top { n } Memes " ,
description = top_formatted ,
2025-03-21 16:05:12 -04:00
colour = 0x25bd6b )
2025-03-20 20:49:23 -04:00
return embed
2025-02-13 14:51:35 -05:00
2025-03-20 20:49:23 -04:00
@tasks.loop ( seconds = 30 , reconnect = True )
async def update_meme_lb ( self ) - > None :
""" Update the Meme Leaderboard """
try :
lb_chanid : int = 1352373745108652145
message_id : int = 1352440888231723070
top_embed = await self . get_top_embed ( n = 10 )
channel = self . bot . get_channel ( lb_chanid )
if not isinstance ( channel , discord . TextChannel ) :
return
message_to_edit = await channel . fetch_message ( message_id )
await message_to_edit . edit ( embed = top_embed ,
content = " ## This message will automatically update periodically. " )
except :
traceback . print_exc ( )
@bridge.bridge_command ( hidden = True )
@commands.is_owner ( )
async def doembed ( self , ctx ) - > None :
""" Do Meme Embed """
meme_lb_chan_id : int = 1352373745108652145
meme_lb_chan : Union [ discord . TextChannel , Any ] = self . bot . get_channel ( meme_lb_chan_id )
embed = await self . get_top_embed ( )
if embed :
await meme_lb_chan . send ( embed = embed )
else :
await ctx . respond ( " NO embed :( " )
2025-02-13 14:51:35 -05:00
def cog_unload ( self ) - > None :
self . meme_stream_loop . cancel ( )
self . explosm_loop . cancel ( )
2025-03-20 20:49:23 -04:00
self . update_meme_lb . cancel ( )
2025-02-13 14:51:35 -05:00
def setup ( bot ) - > None :
""" Run on Cog Load """
bot . add_cog ( Meme ( bot ) )