misc/migration related
This commit is contained in:
		
							
								
								
									
										34
									
								
								cogs/meme.py
									
									
									
									
									
								
							
							
						
						
									
										34
									
								
								cogs/meme.py
									
									
									
									
									
								
							| @@ -136,10 +136,10 @@ class Meme(commands.Cog): | ||||
|     def __init__(self, bot: Havoc) -> None: | ||||
|         self.bot: Havoc = bot | ||||
|         self.stats_db_path: str = os.path.join( | ||||
|             "/mnt/data/share", "sqlite_dbs", "stats.db" | ||||
|             "/", "usr", "local", "share", "sqlite_dbs", "stats.db" | ||||
|         ) | ||||
|         self.memedb_path: str = os.path.join( | ||||
|             "/mnt/data/share", "sqlite_dbs", "meme.db" | ||||
|         self.meme_db_path: str = os.path.join( | ||||
|             "/", "usr", "local", "share", "sqlite_dbs", "meme.db" | ||||
|         ) | ||||
|         self.meme_choices: list = [] | ||||
|         self.meme_counter: int = 0 | ||||
| @@ -264,9 +264,10 @@ class Meme(commands.Cog): | ||||
|             phash: str = str(imagehash.phash(_image)) | ||||
|             query: str = "INSERT INTO memes(discord_uid, timestamp, image, message_ids, phash) VALUES(?, ?, ?, ?, ?)" | ||||
|             image.seek(0) | ||||
|             async with sqlite3.connect(self.memedb_path, timeout=5) as db_conn: | ||||
|             converted = self.convert_to_png(image) | ||||
|             async with sqlite3.connect(self.meme_db_path, timeout=5) as db_conn: | ||||
|                 insert = await db_conn.execute_insert( | ||||
|                     query, (discord_uid, timestamp, image.read(), message_id, phash) | ||||
|                     query, (discord_uid, timestamp, converted, message_id, phash) | ||||
|                 ) | ||||
|                 if insert: | ||||
|                     await db_conn.commit() | ||||
| @@ -286,7 +287,7 @@ class Meme(commands.Cog): | ||||
|         try: | ||||
|             phash: str = str(imagehash.phash(image)) | ||||
|             query: str = "SELECT message_ids FROM memes WHERE phash = ? LIMIT 1" | ||||
|             async with sqlite3.connect(self.memedb_path, timeout=2) as db_conn: | ||||
|             async with sqlite3.connect(self.meme_db_path, timeout=2) as db_conn: | ||||
|                 db_conn.row_factory = sqlite3.Row | ||||
|                 async with await db_conn.execute(query, (phash,)) as db_cursor: | ||||
|                     result = await db_cursor.fetchone() | ||||
| @@ -741,12 +742,15 @@ class Meme(commands.Cog): | ||||
|                 dupe_check = await self.dupe_check(Image.open(image)) | ||||
|                 if dupe_check: | ||||
|                     channel = message.channel | ||||
|                     try: | ||||
|                         original_message = await channel.fetch_message(dupe_check)  # type: ignore | ||||
|                         original_message_url = original_message.jump_url | ||||
|                         await message.add_reaction( | ||||
|                             emoji="<:quietscheentchen:1255956612804247635>" | ||||
|                         ) | ||||
|                         await message.reply(original_message_url) | ||||
|                     except Exception as e: | ||||
|                         logging.info("Failed to mark original image: %s", str(e)) | ||||
|                 else: | ||||
|                     image.seek(0) | ||||
|                     unique_memes.append(image) | ||||
| @@ -856,6 +860,24 @@ class Meme(commands.Cog): | ||||
|         else: | ||||
|             await ctx.respond("NO embed :(") | ||||
|  | ||||
|  | ||||
|     def convert_to_png(self, in_buffer: io.BytesIO) -> bytes: | ||||
|         """ | ||||
|         Convert an in-memory buffer to PNG | ||||
|         Args: | ||||
|             in_buffer (io.BytesIO) | ||||
|         Returns: | ||||
|             bytes | ||||
|         """ | ||||
|         in_buffer.seek(0) | ||||
|         with Image.open(in_buffer) as im: | ||||
|             if im.format == "PNG": | ||||
|                 raise ValueError("Already a PNG") | ||||
|             out_buffer = io.BytesIO() | ||||
|             im.save(out_buffer, format="PNG") | ||||
|             out_buffer.seek(0) | ||||
|         return out_buffer.read() | ||||
|  | ||||
|     def cog_unload(self) -> None: | ||||
|         self.meme_stream_loop.cancel() | ||||
|         self.explosm_loop.cancel() | ||||
|   | ||||
							
								
								
									
										47
									
								
								cogs/misc.py
									
									
									
									
									
								
							
							
						
						
									
										47
									
								
								cogs/misc.py
									
									
									
									
									
								
							| @@ -214,8 +214,51 @@ class Misc(commands.Cog): | ||||
|                 except Exception as e: | ||||
|                     logging.debug("Failed to add xmas reaction: %s", str(e)) | ||||
|                 await ctx.respond( | ||||
|                     f"Only {days} days, {hours} hours, {minutes} minutes,\ | ||||
|                     {seconds} seconds and {ms} ms left! (UTC)".translate(xmas_trans) | ||||
|                     f"Only {days} days, {hours} hours, {minutes} minutes, {seconds} seconds and {ms} ms left until Christmas! (UTC)".translate( | ||||
|                         xmas_trans | ||||
|                     ) | ||||
|                 ) | ||||
|         except Exception as e: | ||||
|             traceback.print_exc() | ||||
|             return await ctx.respond(f"Error: {str(e)}") | ||||
|  | ||||
|     @bridge.bridge_command() | ||||
|     async def halloween(self, ctx) -> None: | ||||
|         """ | ||||
|         Countdown til Halloween! | ||||
|         """ | ||||
|         try: | ||||
|             emojis: dict = { | ||||
|                 "0": "0️⃣", | ||||
|                 "1": "1️⃣", | ||||
|                 "2": "2️⃣", | ||||
|                 "3": "3️⃣", | ||||
|                 "4": "4️⃣", | ||||
|                 "5": "5️⃣", | ||||
|                 "6": "6️⃣", | ||||
|                 "7": "7️⃣", | ||||
|                 "8": "8️⃣", | ||||
|                 "9": "9️⃣", | ||||
|             } | ||||
|             with ctx.channel.typing(): | ||||
|                 countdown = self.util.get_days_to_halloween() | ||||
|                 if not isinstance(countdown, tuple) or len(countdown) < 6: | ||||
|                     return await ctx.respond( | ||||
|                         "Oops, Halloween is cancelled." | ||||
|                     )  # Invalid countdown from util | ||||
|                 (days, hours, minutes, seconds, ms, _) = countdown | ||||
|                 now: datetime.datetime = datetime.datetime.now() | ||||
|                 if now.month == 12 and now.day == 25: | ||||
|                     return await ctx.respond("# IT IS HALLOWEEN!!!!!!!!\n-# GET ROWDY") | ||||
|                 halloween_trans: dict = str.maketrans(emojis) | ||||
|                 try: | ||||
|                     await ctx.message.add_reaction(emoji="🎃") | ||||
|                 except Exception as e: | ||||
|                     logging.debug("Failed to add Halloween reaction: %s", str(e)) | ||||
|                 await ctx.respond( | ||||
|                     f"Only {days} days, {hours} hours, {minutes} minutes, {seconds} seconds and {ms} ms left until Halloween! (UTC)".translate( | ||||
|                         halloween_trans | ||||
|                     ) | ||||
|                 ) | ||||
|         except Exception as e: | ||||
|             traceback.print_exc() | ||||
|   | ||||
| @@ -107,9 +107,7 @@ class Radio(commands.Cog): | ||||
|                 but they exist. | ||||
|                 """ | ||||
|                 logging.info("Detected VC not playing... playing!") | ||||
|                 source: discord.FFmpegAudio = discord.FFmpegOpusAudio( | ||||
|                     self.STREAM_URL | ||||
|                 ) | ||||
|                 source: discord.FFmpegAudio = discord.FFmpegOpusAudio(self.STREAM_URL) | ||||
|                 vc.play(  # type: ignore | ||||
|                     source, | ||||
|                     after=lambda e: logging.info("Error: %s", e) if e else None, | ||||
|   | ||||
| @@ -61,8 +61,7 @@ class Sing(commands.Cog): | ||||
|                         ) | ||||
|  | ||||
|                 await ctx.respond( | ||||
|                     "*Searching...*", | ||||
|                     ephemeral=True | ||||
|                     "*Searching...*", ephemeral=True | ||||
|                 )  # Must respond to interactions within 3 seconds, per Discord | ||||
|  | ||||
|                 parsed = self.utility.parse_song_input(song, activity) | ||||
|   | ||||
| @@ -10,7 +10,7 @@ class DB: | ||||
|  | ||||
|     def __init__(self, bot) -> None: | ||||
|         self.db_path: str | LiteralString = os.path.join( | ||||
|             "/mnt/data/share", "sqlite_dbs", "lovehate.db" | ||||
|             "/usr/local/share", "sqlite_dbs", "lovehate.db" | ||||
|         ) | ||||
|  | ||||
|     async def get_wholovehates( | ||||
|   | ||||
| @@ -21,14 +21,14 @@ class Util: | ||||
|         self.URL_INSULTAPI: str = "https://insult.mattbas.org/api/insult" | ||||
|         self.COMPLIMENT_GENERATOR = ComplimentGenerator() | ||||
|         self.dbs: dict[str, str | LiteralString] = { | ||||
|             "whisky": os.path.join("/mnt/data/share", "sqlite_dbs", "whiskey.db"), | ||||
|             "drinks": os.path.join("/mnt/data/share", "sqlite_dbs", "cocktails.db"), | ||||
|             "strains": os.path.join("/mnt/data/share", "sqlite_dbs", "strains.db"), | ||||
|             "qajoke": os.path.join("/mnt/data/share", "sqlite_dbs", "qajoke.db"), | ||||
|             "rjokes": os.path.join("/mnt/data/share", "sqlite_dbs", "rjokes.db"), | ||||
|             "randmsg": os.path.join("/mnt/data/share", "sqlite_dbs", "randmsg.db"), | ||||
|             "stats": os.path.join("/mnt/data/share", "sqlite_dbs", "havoc_stats.db"), | ||||
|             "cookies": os.path.join("/mnt/data/share", "sqlite_dbs", "cookies.db"), | ||||
|             "whisky": os.path.join("/usr/local/share", "sqlite_dbs", "whiskey.db"), | ||||
|             "drinks": os.path.join("/usr/local/share", "sqlite_dbs", "cocktails.db"), | ||||
|             "strains": os.path.join("/usr/local/share", "sqlite_dbs", "strains.db"), | ||||
|             "qajoke": os.path.join("/usr/local/share", "sqlite_dbs", "qajoke.db"), | ||||
|             "rjokes": os.path.join("/usr/local/share", "sqlite_dbs", "rjokes.db"), | ||||
|             "randmsg": os.path.join("/usr/local/share", "sqlite_dbs", "randmsg.db"), | ||||
|             "stats": os.path.join("/usr/local/share", "sqlite_dbs", "havoc_stats.db"), | ||||
|             "cookies": os.path.join("/usr/local/share", "sqlite_dbs", "cookies.db"), | ||||
|         } | ||||
|         self.COFFEES: list = [ | ||||
|             "a cup of french-pressed coffee", | ||||
| @@ -165,7 +165,7 @@ class Util: | ||||
|                 query, (counter,) if counter else None | ||||
|             ) as db_cursor: | ||||
|                 result = await db_cursor.fetchone() | ||||
|                 return result | ||||
|                 return dict(result) | ||||
|  | ||||
|     async def get_stats_embed(self) -> Optional[Embed]: | ||||
|         """ | ||||
| @@ -207,9 +207,7 @@ class Util: | ||||
|                 f"UPDATE stats SET {counter} = {counter} + 1" | ||||
|             ) as db_cursor: | ||||
|                 if db_cursor.rowcount < 0: | ||||
|                     logging.critical( | ||||
|                         "[increment_counter] Fail! %s", db_cursor.rowcount | ||||
|                     ) | ||||
|                     logging.critical("[increment_counter] Fail! %s", db_cursor.rowcount) | ||||
|                     return False | ||||
|                 await db_conn.commit() | ||||
|                 return True | ||||
| @@ -547,6 +545,26 @@ class Util: | ||||
|  | ||||
|         return (days, hours, minutes, seconds, ms, us) | ||||
|  | ||||
|     def get_days_to_halloween(self) -> Optional[tuple]: | ||||
|         """ | ||||
|         Get # of Days until Halloween | ||||
|  | ||||
|         Returns: | ||||
|             Optional[tuple] | ||||
|  | ||||
|         """ | ||||
|         today: datetime.datetime = datetime.datetime.now(tz=pytz.UTC) | ||||
|         halloween: datetime.datetime = datetime.datetime( | ||||
|             year=today.year, | ||||
|             month=10, | ||||
|             day=31, | ||||
|             tzinfo=pytz.UTC, | ||||
|         ) | ||||
|         td: datetime.timedelta = halloween - today | ||||
|         days, hours, minutes, seconds, us, ms = self.tdTuple(td) | ||||
|  | ||||
|         return (days, hours, minutes, seconds, ms, us) | ||||
|  | ||||
|     async def get_randmsg(self) -> Optional[str]: | ||||
|         """ | ||||
|         Get Random Message from randmsg.db | ||||
|   | ||||
		Reference in New Issue
	
	Block a user