2025-02-16 13:16:53 -05:00
from typing import Optional
from aiohttp import ClientSession , ClientTimeout , FormData
import traceback
import logging
import random
import os
import magic
"""
Catbox Uploader ( Async )
"""
catbox_api_url : str = " https://catbox.moe/user/api.php "
http_headers : dict [ str , str ] = {
' accept ' : ' */* ' ,
' User-Agent ' : ' Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/101.0.4951.64 Safari/537.36 Edg/101.0.1210.53 ' ,
' Accept-Language ' : ' en-US,en;q=0.9,it;q=0.8,es;q=0.7 ' ,
' referer ' : ' https://www.google.com/ ' ,
' cookie ' : ' DSID=AAO-7r4OSkS76zbHUkiOpnI0kk-X19BLDFF53G8gbnd21VZV2iehu-w_2v14cxvRvrkd_NjIdBWX7wUiQ66f-D8kOkTKD1BhLVlqrFAaqDP3LodRK2I0NfrObmhV9HsedGE7-mQeJpwJifSxdchqf524IMh9piBflGqP0Lg0_xjGmLKEQ0F4Na6THgC06VhtUG5infEdqMQ9otlJENe3PmOQTC_UeTH5DnENYwWC8KXs-M4fWmDADmG414V0_X0TfjrYu01nDH2Dcf3TIOFbRDb993g8nOCswLMi92LwjoqhYnFdf1jzgK0 '
}
class CatboxAsync :
2025-02-16 13:18:44 -05:00
def __init__ ( self ) - > None :
self . catbox_api_url = catbox_api_url
self . headers = http_headers
2025-02-16 13:16:53 -05:00
def generateRandomFileName ( self , fileExt : Optional [ str ] = None ) - > str :
"""
Generate random file name
2025-02-16 20:07:02 -05:00
2025-02-16 13:16:53 -05:00
Args :
fileExt ( Optional [ str ] ) : File extension to use for naming
Returns :
str
"""
if not fileExt :
fileExt = ' png '
return f " { random . getrandbits ( 32 ) } . { fileExt } "
async def upload ( self , file : str ) - > Optional [ str ] :
"""
Upload file to catbox
2025-02-16 20:07:02 -05:00
2025-02-16 13:16:53 -05:00
Args :
file ( str ) : Path of file to be uploaded
Returns :
str
"""
try :
if not file :
return None
if not ( os . path . exists ( file ) ) :
logging . critical ( " Could not find %s " ,
file )
return None
fileExt : Optional [ str ] = None
if file . find ( " . " ) > 0 :
fileExt = " " . join ( file . split ( " . " ) [ - 1 : ] )
with open ( file , ' rb ' ) as fileContents :
post_data : FormData = FormData ( )
post_data . add_field ( name = " reqtype " ,
value = " fileupload " )
post_data . add_field ( name = " userhash " ,
value = " " )
with magic . Magic ( flags = magic . MAGIC_MIME ) as m :
content_type = m . id_filename ( file )
post_data . add_field ( name = " fileToUpload " ,
value = fileContents ,
filename = self . generateRandomFileName ( fileExt ) ,
content_type = content_type
)
async with ClientSession ( ) as session :
async with await session . post ( self . catbox_api_url ,
headers = self . headers ,
data = post_data ,
timeout = ClientTimeout ( connect = 10 , sock_read = 10 ) ) as request :
request . raise_for_status ( )
return await request . text ( )
except :
traceback . print_exc ( )
return None
finally :
try :
fileContents . close ( )
except :
return None