misc
This commit is contained in:
parent
f61c437bf5
commit
a8014f286a
88
catbox.py
Normal file
88
catbox.py
Normal file
@ -0,0 +1,88 @@
|
||||
#!/usr/bin/env python3.12
|
||||
|
||||
from typing import Optional
|
||||
from aiohttp import ClientSession, ClientTimeout, FormData
|
||||
import traceback
|
||||
import logging
|
||||
import random
|
||||
import os
|
||||
import magic
|
||||
|
||||
"""
|
||||
Catbox Uploader (Async)
|
||||
"""
|
||||
|
||||
catbox_api_url: str = "https://catbox.moe/user/api.php"
|
||||
http_headers: dict[str, str] = {
|
||||
'accept': '*/*',
|
||||
'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/101.0.4951.64 Safari/537.36 Edg/101.0.1210.53',
|
||||
'Accept-Language': 'en-US,en;q=0.9,it;q=0.8,es;q=0.7',
|
||||
'referer': 'https://www.google.com/',
|
||||
'cookie': 'DSID=AAO-7r4OSkS76zbHUkiOpnI0kk-X19BLDFF53G8gbnd21VZV2iehu-w_2v14cxvRvrkd_NjIdBWX7wUiQ66f-D8kOkTKD1BhLVlqrFAaqDP3LodRK2I0NfrObmhV9HsedGE7-mQeJpwJifSxdchqf524IMh9piBflGqP0Lg0_xjGmLKEQ0F4Na6THgC06VhtUG5infEdqMQ9otlJENe3PmOQTC_UeTH5DnENYwWC8KXs-M4fWmDADmG414V0_X0TfjrYu01nDH2Dcf3TIOFbRDb993g8nOCswLMi92LwjoqhYnFdf1jzgK0'
|
||||
}
|
||||
|
||||
class CatboxAsync:
|
||||
def generateRandomFileName(self, fileExt: Optional[str] = None) -> str:
|
||||
"""
|
||||
Generate random file name
|
||||
Args:
|
||||
fileExt (Optional[str]): File extension to use for naming
|
||||
Returns:
|
||||
str
|
||||
"""
|
||||
if not fileExt:
|
||||
fileExt = 'png'
|
||||
return f"{random.getrandbits(32)}.{fileExt}"
|
||||
|
||||
def __init__(self):
|
||||
self.catbox_api_url = catbox_api_url
|
||||
self.headers = http_headers
|
||||
|
||||
async def upload(self, file: str) -> Optional[str]:
|
||||
"""
|
||||
Upload file to catbox
|
||||
Args:
|
||||
file (str): Path of file to be uploaded
|
||||
Returns:
|
||||
str
|
||||
"""
|
||||
try:
|
||||
if not file:
|
||||
return None
|
||||
if not(os.path.exists(file)):
|
||||
logging.critical("Could not find %s",
|
||||
file)
|
||||
return None
|
||||
|
||||
fileExt: Optional[str] = None
|
||||
if file.find(".") > 0:
|
||||
fileExt = "".join(file.split(".")[-1:])
|
||||
|
||||
with open(file, 'rb') as fileContents:
|
||||
post_data: FormData = FormData()
|
||||
post_data.add_field(name="reqtype",
|
||||
value="fileupload")
|
||||
post_data.add_field(name="userhash",
|
||||
value="")
|
||||
with magic.Magic(flags=magic.MAGIC_MIME) as m:
|
||||
content_type = m.id_filename(file)
|
||||
post_data.add_field(name="fileToUpload",
|
||||
value=fileContents,
|
||||
filename=self.generateRandomFileName(fileExt),
|
||||
content_type=content_type
|
||||
)
|
||||
async with ClientSession() as session:
|
||||
async with await session.post(self.catbox_api_url,
|
||||
headers=self.headers,
|
||||
data=post_data,
|
||||
timeout=ClientTimeout(connect=10, sock_read=10)) as request:
|
||||
request.raise_for_status()
|
||||
return await request.text()
|
||||
except:
|
||||
traceback.print_exc()
|
||||
return None
|
||||
finally:
|
||||
try:
|
||||
fileContents.close()
|
||||
except:
|
||||
return None
|
@ -106,13 +106,13 @@ class Radio(commands.Cog):
|
||||
but they exist.
|
||||
"""
|
||||
logging.info("Detected VC not playing... playing!")
|
||||
source = discord.FFmpegOpusAudio(self.STREAM_URL,
|
||||
source: discord.FFmpegAudio = discord.FFmpegOpusAudio(self.STREAM_URL,
|
||||
before_options="-timeout 3000000")
|
||||
|
||||
vc.play(source, after=lambda e: logging.info("Error: %s", e) if e else None) # type: ignore
|
||||
|
||||
vc.play(source, # type: ignore
|
||||
after=lambda e: logging.info("Error: %s", e) if e\
|
||||
else None)
|
||||
# Get Now Playing
|
||||
np_track = await get_now_playing()
|
||||
np_track: Optional[str] = await get_now_playing()
|
||||
if np_track and not self.LAST_NP_TRACK == np_track:
|
||||
self.LAST_NP_TRACK = np_track
|
||||
if isinstance(vc.channel, discord.VoiceChannel):
|
||||
|
@ -6,9 +6,9 @@ import regex
|
||||
from regex import Pattern
|
||||
import os
|
||||
import random
|
||||
import logging
|
||||
import traceback
|
||||
from urllib.parse import quote as urlquote
|
||||
from catbox import Catbox
|
||||
from catbox import CatboxAsync
|
||||
|
||||
"""
|
||||
Jesus Meme Generator
|
||||
@ -65,8 +65,11 @@ class JesusMemeGenerator():
|
||||
f.write(await response.read())
|
||||
|
||||
if not out_fname:
|
||||
uploader = Catbox()
|
||||
meme_link: str = uploader.upload(f"{self.MEMESTORAGEDIR}/{out_fname}")
|
||||
uploader = CatboxAsync()
|
||||
meme_link: Optional[str] = await uploader.upload(f"{self.MEMESTORAGEDIR}/{out_fname}")
|
||||
if not meme_link:
|
||||
logging.info("Meme upload failed!")
|
||||
return None
|
||||
return meme_link
|
||||
except:
|
||||
print(traceback.format_exc())
|
||||
|
104
litterbox.py
Normal file
104
litterbox.py
Normal file
@ -0,0 +1,104 @@
|
||||
#!/usr/bin/env python3.12
|
||||
|
||||
"""
|
||||
Litterbox (Catbox) Uploader (Async)
|
||||
"""
|
||||
|
||||
import traceback
|
||||
from typing import Optional, Union
|
||||
from io import BufferedReader
|
||||
from aiohttp import ClientSession, ClientTimeout, FormData
|
||||
import magic
|
||||
import logging
|
||||
import random
|
||||
import os
|
||||
|
||||
litterbox_api_url: str = "https://litterbox.catbox.moe/resources/internals/api.php"
|
||||
http_headers: dict[str, str] = {
|
||||
'accept': '*/*',
|
||||
'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/101.0.4951.64 Safari/537.36 Edg/101.0.1210.53',
|
||||
'Accept-Language': 'en-US,en;q=0.9,it;q=0.8,es;q=0.7',
|
||||
'referer': 'https://www.google.com/',
|
||||
'cookie': 'DSID=AAO-7r4OSkS76zbHUkiOpnI0kk-X19BLDFF53G8gbnd21VZV2iehu-w_2v14cxvRvrkd_NjIdBWX7wUiQ66f-D8kOkTKD1BhLVlqrFAaqDP3LodRK2I0NfrObmhV9HsedGE7-mQeJpwJifSxdchqf524IMh9piBflGqP0Lg0_xjGmLKEQ0F4Na6THgC06VhtUG5infEdqMQ9otlJENe3PmOQTC_UeTH5DnENYwWC8KXs-M4fWmDADmG414V0_X0TfjrYu01nDH2Dcf3TIOFbRDb993g8nOCswLMi92LwjoqhYnFdf1jzgK0'
|
||||
}
|
||||
|
||||
class LitterboxAsync:
|
||||
def __init__(self) -> None:
|
||||
self.headers: dict[str, str] = http_headers
|
||||
self.api_path = litterbox_api_url
|
||||
|
||||
def generateRandomFileName(self, fileExt: Optional[str] = None) -> str:
|
||||
"""
|
||||
Generate Random Filename
|
||||
Args:
|
||||
fileExt (Optional[str]): File extension to use for naming
|
||||
Returns:
|
||||
str
|
||||
"""
|
||||
if not fileExt:
|
||||
fileExt = 'png'
|
||||
return f"{random.getrandbits(32)}.{fileExt}"
|
||||
|
||||
async def upload(self,
|
||||
file: Union[str, bytes, BufferedReader],
|
||||
time='1h') -> Optional[str]:
|
||||
"""
|
||||
Upload File to Litterbox
|
||||
Args:
|
||||
file (Union[str, bytes, BufferedReader]): File to upload (accepts either filepath or io.BufferedReader)
|
||||
time (str): Expiration time, default: 1h
|
||||
Returns:
|
||||
Optional[str]
|
||||
"""
|
||||
try:
|
||||
if not file:
|
||||
return None
|
||||
if isinstance(file, str):
|
||||
if not os.path.exists(file):
|
||||
logging.critical("Could not find %s", file)
|
||||
return None
|
||||
if isinstance(file, BufferedReader):
|
||||
file = file.read()
|
||||
|
||||
fileExt: str = 'png'
|
||||
if isinstance(file, str):
|
||||
if file.find(".") > 0:
|
||||
fileExt = "".join(file.split(".")[-1:])
|
||||
|
||||
file = open(file, 'rb').read()
|
||||
|
||||
with magic.Magic(flags=magic.MAGIC_MIME) as m:
|
||||
if isinstance(file, BufferedReader):
|
||||
content_type = str(m.id_buffer(file))
|
||||
else:
|
||||
content_type = str(m.id_filename(file))
|
||||
|
||||
post_data: FormData = FormData()
|
||||
post_data.add_field(name="reqtype",
|
||||
value="fileupload")
|
||||
post_data.add_field(name="userhash",
|
||||
value="")
|
||||
post_data.add_field(name="time",
|
||||
value=time)
|
||||
|
||||
post_data.add_field(name="fileToUpload",
|
||||
value=file,
|
||||
filename=self.generateRandomFileName(fileExt),
|
||||
content_type=content_type
|
||||
)
|
||||
async with ClientSession() as session:
|
||||
async with await session.post(self.api_path,
|
||||
headers=self.headers,
|
||||
data=post_data,
|
||||
timeout=ClientTimeout(connect=5, sock_read=70)) as request:
|
||||
request.raise_for_status()
|
||||
return await request.text()
|
||||
except:
|
||||
traceback.print_exc()
|
||||
return None
|
||||
finally:
|
||||
if isinstance(file, BufferedReader):
|
||||
try:
|
||||
file.close()
|
||||
except:
|
||||
return None
|
25
litterbox_test.py
Normal file
25
litterbox_test.py
Normal file
@ -0,0 +1,25 @@
|
||||
#!/usr/bin/env python3.12
|
||||
|
||||
"""Tests for both Catbox & Litterbox"""
|
||||
|
||||
import os
|
||||
import logging
|
||||
import asyncio
|
||||
from typing import Optional
|
||||
from litterbox import LitterboxAsync
|
||||
from catbox import CatboxAsync
|
||||
|
||||
logger = logging.getLogger()
|
||||
logger.setLevel(logging.INFO)
|
||||
|
||||
async def test() -> None:
|
||||
f = os.path.join(os.path.expanduser("~"), "qu.png")
|
||||
box1: LitterboxAsync = LitterboxAsync()
|
||||
box2: CatboxAsync = CatboxAsync()
|
||||
url1: Optional[str] = await box1.upload(f)
|
||||
url2: Optional[str] = await box2.upload(f)
|
||||
logging.info("""Uploaded URLs:
|
||||
Litter - %s\n
|
||||
Cat - %s""", url1, url2)
|
||||
|
||||
asyncio.run(test())
|
@ -1,11 +1,12 @@
|
||||
#!/usr/bin/env python3.12
|
||||
|
||||
import logging
|
||||
from typing import Optional
|
||||
from aiohttp import ClientSession, ClientTimeout
|
||||
|
||||
"""Radio Utils"""
|
||||
|
||||
async def get_now_playing() -> str:
|
||||
async def get_now_playing() -> Optional[str]:
|
||||
np_url: str = "https://api.codey.lol/radio/np"
|
||||
try:
|
||||
async with ClientSession() as session:
|
||||
@ -19,3 +20,4 @@ async def get_now_playing() -> str:
|
||||
except Exception as e:
|
||||
logging.critical("Now playing retrieval failed: %s",
|
||||
str(e))
|
||||
return None
|
Loading…
x
Reference in New Issue
Block a user