minor catbox update/minor discord presence update; discord presence code is a shitshow

This commit is contained in:
codey 2025-03-22 07:46:10 -04:00
parent 2a6c0a6039
commit 21c89dd91d
3 changed files with 412 additions and 82 deletions

View File

@ -59,7 +59,7 @@ class Catbox():
'userhash': ''
}
r = requests.post(CATBOX_API_PATH, headers=HEADERS, proxies=self.PROXIES, data=postData, timeout=10, files={'fileToUpload': (self.generateRandomFileName(fileExt), fileContents)})
r = requests.post(CATBOX_API_PATH, headers=HEADERS, proxies=self.PROXIES, data=postData, timeout=(2, 8), files={'fileToUpload': (self.generateRandomFileName(fileExt), fileContents)})
if r.status_code in [200, 301, 302]:
return r.text.strip()

View File

@ -14,21 +14,37 @@ from pypresence import Presence, ActivityType
setproctitle("disc-presence")
class ImageUpload:
def __init__(self) -> None:
self.upload_uri = "https://api.codey.lol/misc/upload_activity_image"
def upload(self, file: bytes) -> None:
try:
r = requests.post(self.upload_uri, files=
{'image': file})
print(f"Response {r.status_code}:\n{r.text}")
r.raise_for_status()
except Exception as e:
print(f"Exception: {str(e)}")
class DBus:
def __init__(self) -> None:
self.session_bus = dbus.SessionBus()
self.catbox = catbox.Catbox()
self.uploader = ImageUpload()
try:
self.player_dbus_proxy = self.session_bus.get_object('org.mpris.MediaPlayer2.playerctld',
'/org/mpris/MediaPlayer2')
self.player_interface = dbus.Interface(self.player_dbus_proxy,
'org.freedesktop.DBus.Properties')
self.interface_details = None
self.interface_metadata = None
self.plex_last = None
self.plex_np = {
'name': None,
'details': None,
'art': None,
'elapsed': None,
'duration': None,
}
except:
pass
@ -36,30 +52,56 @@ class DBus:
def is_plexing(self) -> bool:
try:
self.interface_details = self.player_interface.GetAll('org.mpris.MediaPlayer2.Player').get('Metadata')
if "app.plex.tv" in self.interface_details.get('xesam:url'):
self.interface_metadata = self.player_interface.GetAll('org.mpris.MediaPlayer2.Player').get('Metadata')
if "app.plex.tv" in self.interface_metadata.get('xesam:url'):
return True
print(f"WTF, NO PLEX?")
except:
print(traceback.format_exc())
return False
def get_now_playing(self) -> dict | bool:
def get_now_playing(self):
try:
self.interface_details = self.player_interface.GetAll('org.mpris.MediaPlayer2.Player').get('Metadata')
original_art = str(self.interface_details.get('mpris:artUrl', ''))
self.interface_details = self.player_interface.GetAll('org.mpris.MediaPlayer2.Player')
self.interface_metadata = self.interface_details.get('Metadata')
original_art = str(self.interface_metadata.get('mpris:artUrl', ''))
self.plex_np = {
'name': str(" ".join(self.interface_details.get('xesam:artist'))),
'details': f'{str(self.interface_details.get('xesam:album', ''))} {str(self.interface_details.get('xesam:title'))}',
'name': str(" ".join(self.interface_metadata.get('xesam:artist'))),
'details': f'{str(self.interface_metadata.get('xesam:album', ''))} - {str(self.interface_metadata.get('xesam:title'))}',
# 'elapsed': time.time() - int(self.interface_details.get('Position')/1000000),
# 'duration': int(self.interface_metadata.get('mpris:length')/1000000),
}
if not self.interface_metadata.get('xesam:album'):
self.plex_np['details'] = self.plex_np['details'][2:] # Fix for "'- ' prefix"
if not self.plex_last or\
(self.plex_np.get('name') == self.plex_last.get('name') and self.plex_np.get('details') == self.plex_last.get('details')):
self.plex_np['art'] = self.catbox.upload(original_art.split("file://", maxsplit=1)[1])
if original_art:
original_art = original_art.split("file://", maxsplit=1)[1]
bytes_art = None
try:
with open(original_art, 'rb') as f:
bytes_art = f.read()
if isinstance(bytes_art, bytes):
# self.uploader.upload(bytes_art)
self.plex_np['art'] = 'https://api.codey.lol/misc/get_activity_image'
else:
print(f"bytes art: {type(bytes_art)}")
self.plex_np['art'] = None
except:
traceback.print_exc()
self.plex_np['art'] = None
else:
# print("No original art for this file")
self.plex_np['art'] = None
self.plex_last = self.plex_np
return self.plex_np
except:
self.plex_np = {
'name': None,
'details': None,
'art': None,
'elapsed': None,
'duration': None,
}
print(traceback.format_exc())
return False
@ -74,7 +116,7 @@ class DiscordPresence:
self.client_id_plex = discord_presence_priv.CLIENT_ID_PLEX
self.api_key = discord_presence_priv.API_KEY
self.api_url = 'https://api.codey.lol'
self.album_art_url = 'https://codey.lol/images/radio_art.jpg'
self.album_art_url = 'https://api.codey.lol/radio/album_art'
self.api_req_data = {
'bid': 0,
'cmd': 'radio_metadata',
@ -110,120 +152,124 @@ class DiscordPresence:
continue
if self.last_updated and (now - self.last_updated < 5):
time.sleep(0.7)
if self.last_updated and (now - self.last_updated < 15):
time.sleep(0.5)
continue
RPC.client_id = self.client_id
# if RPC.client_id != self.client_id:
# try:
# try:5
# RPC.close()
# except:
# pass
# RPC = Presence(self.client_id, pipe=0)
# RPC.connect()
print(RPC.update(
details=plex_new.get('name'),
state=plex_new.get('details'),
large_image=plex_new.get('art'),
details=plex_new.get('name', '\u2064') if plex_new.get('name') else plex_new.get('details'),
state=plex_new.get('details', None) if plex_new.get('details') else None,
large_image=plex_new.get('art', 'https://www.plex.tv/wp-content/themes/plex/assets/img/favicons/plex-192.png'),
# large_image="https://codey.lol/images/cat.png",
# large_text=f"{rand_msg}" if rand_msg else None,k
large_text=lyrics_current_iteration if lyrics_current_iteration else None,
small_image=f"https://codey.lol/images/{chosen_image.get('image')}.png",
small_text=chosen_image.get('label'),
# small_image=f"https://codey.lol/images/{chosen_image.get('image')}.png",
# small_text=chosen_image.get('label'),
activity_type=ActivityType.WATCHING,
buttons=[
{
"label": "Listen",
"url": "https://codey.lol/radio"
},]
# start=plex_new.get('elapsed'),
# end=plex_new.get('duration')/1000,
))
time.sleep(1)
continue
request = requests.post(f'{self.api_url}/xc/', json=self.api_req_data,
request = requests.post(f'{self.api_url}/radio/np',
headers={
'content-type': 'application/json; charset=utf-8'
}, timeout=(2, 10))
request.raise_for_status()
data = request.json().get('response')
track_artist = data.get('artist')
track_title = data.get('title')
data = request.json()
track = data.get('artistsong')
track_artist = data.get('artist')[0:127]
track_title = data.get('song')[0:127]
track_album = data.get('album', '')[0:127]
track_genre = data.get('genre', '')
track_id = data.get('id')
start_time = data.get('start')
end_time = data.get('end')
track = f"{data.get('artist')} - {data.get('title')}"
if self.last_updated and (now - self.last_updated < 5):
time.sleep(0.7)
if self.last_track == track:
continue
if self.last_track and self.last_track == track:
lyr_start_idx = lyr_end_idx
lyr_end_idx += 1
elif self.last_track:
lyr_start_idx = 0
lyr_end_idx = 1
# if self.last_updated and (now - self.last_updated < 15):
# time.sleep(0.7)
# continue
# if self.last_track and self.last_track == track:
# lyr_start_idx = lyr_end_idx
# lyr_end_idx += 1
# elif self.last_track:
# lyr_start_idx = 0
# lyr_end_idx = 1
try:
# rand_msg_request = requests.post(f'{self.api_url}/randmsg/',
# headers={
# 'content-type': 'application/json; charset=utf-8'
# }, json={
# 'short': True,
# }, timeout=(1, 2))
# rand_msg = rand_msg_request.json().get('msg').strip()
# rand_msg = re.sub(r'(<b>|</b>)', '', re.sub(r'(<br>|<br/>|<br />|\n|\r\n)', ' ', rand_msg))
# if len(rand_msg) > 126:
# rand_msg = None
# try:
# # rand_msg_request = requests.post(f'{self.api_url}/randmsg/',
# # headers={
# # 'content-type': 'application/json; charset=utf-8'
# # }, json={
# # 'short': True,
# # }, timeout=(1, 2))
# # rand_msg = rand_msg_request.json().get('msg').strip()
# # rand_msg = re.sub(r'(<b>|</b>)', '', re.sub(r'(<br>|<br/>|<br />|\n|\r\n)', ' ', rand_msg))
# # if len(rand_msg) > 126:
# # rand_msg = None
if not self.last_track or not self.last_track == track:
lyrics = requests.post(f'{self.api_url}/lyric_search/',
headers={
'content-type': 'application/json; charset=utf-8',
},
json={
'a': track_artist,
's': track_title,
'src': 'WEB-RADIO',
}, timeout=(2,10))
lyrics.raise_for_status()
lyrics_content = lyrics.json().get("lyrics")
# if not self.last_track or not self.last_track == track:
# lyrics = requests.post(f'{self.api_url}/lyric/search',
# headers={
# 'content-type': 'application/json; charset=utf-8',
# },
# json={
# 'a': track_artist,
# 's': track_title,
# 'src': 'WEB-RADIO',
# }, timeout=(2,10))
# lyrics.raise_for_status()
# lyrics_content = lyrics.json().get("lyrics")
lyrics_current_iteration = " / ".join(lyrics_content.split("<br>")[lyr_start_idx:lyr_end_idx]).strip()
if not lyrics_current_iteration.strip():
lyr_start_idx = 0
lyr_end_idx = 1
# lyrics_current_iteration = " / ".join(lyrics_content.split("<br>")[lyr_start_idx:lyr_end_idx]).strip()
# if not lyrics_current_iteration.strip():
# lyr_start_idx = 0
# lyr_end_idx = 1
if len(lyrics_current_iteration) > 128:
lyrics_current_iteration = lyrics_current_iteration.split(" / ", maxsplit=1)[0].strip()
# if len(lyrics_current_iteration) > 128:
# lyrics_current_iteration = lyrics_current_iteration.split(" / ", maxsplit=1)[0].strip()
except:
try:
lyrics_current_iteration = " / ".join(lyrics_content.split("<br>")[0]).strip()
except:
lyrics_content = lyrics_current_iteration = None
print("FAILED TO GET RANDMSG/LYRICS")
print(traceback.format_exc())
# except:
# try:
# lyrics_current_iteration = " / ".join(lyrics_content.split("<br>")[0]).strip()
# except:
# lyrics_content = lyrics_current_iteration = None
# print("FAILED TO GET RANDMSG/LYRICS")
# print(traceback.format_exc())
self.last_updated = now
self.last_track = track
lyrics_current_iteration = None # disable
print(RPC.update(
details=track_title,
state=track_artist,
large_image=f"{self.album_art_url}?t={now}",
state=f"{track_artist[0:100]} [{track_genre}]"[0:127],
large_image=f"{self.album_art_url}?t={now}&track_id={track_id}",
# large_image="https://codey.lol/images/cat.png",
# large_text=f"{rand_msg}" if rand_msg else None,k
large_text=lyrics_current_iteration if lyrics_current_iteration else None,
small_image=f"https://codey.lol/images/{chosen_image.get('image')}.png",
small_text=chosen_image.get('label'),
large_text=track_album if track_album else None,
# small_image=f"https://codey.lol/images/{chosen_image.get('image')}.png",
# small_text=chosen_image.get('label'),
start=start_time,
end=end_time,
activity_type=ActivityType.LISTENING,
@ -247,7 +293,7 @@ class DiscordPresence:
# }
except:
print(traceback.format_exc())
time.sleep(5)
time.sleep(0.7)
continue
except:
print(traceback.format_exc())

284
discord_presence.py.1 Normal file
View File

@ -0,0 +1,284 @@
#!/usr/bin/env python3.12
import time
import traceback
import requests
import dbus
import re
import random
import discord_presence_priv
import catbox
from setproctitle import setproctitle
from pypresence import Presence, ActivityType
setproctitle("disc-presence")
class DBus:
def __init__(self) -> None:
self.session_bus = dbus.SessionBus()
self.catbox = catbox.Catbox()
try:
self.player_dbus_proxy = self.session_bus.get_object('org.mpris.MediaPlayer2.playerctld',
'/org/mpris/MediaPlayer2')
self.player_interface = dbus.Interface(self.player_dbus_proxy,
'org.freedesktop.DBus.Properties')
self.interface_metadata = None
self.plex_last = None
self.plex_np = {
'name': None,
'details': None,
'art': None,
'elapsed': None,
'duration': None,
}
except:
pass
def is_plexing(self) -> bool:
try:
self.interface_metadata = self.player_interface.GetAll('org.mpris.MediaPlayer2.Player').get('Metadata')
if "app.plex.tv" in self.interface_metadata.get('xesam:url'):
return True
print(f"WTF, NO PLEX?")
except:
print(traceback.format_exc())
return False
def get_now_playing(self) -> dict | bool:
try:
self.interface_details = self.player_interface.GetAll('org.mpris.MediaPlayer2.Player')
self.interface_metadata = self.interface_details.get('Metadata')
original_art = str(self.interface_metadata.get('mpris:artUrl', ''))
self.plex_np = {
'name': str(" ".join(self.interface_metadata.get('xesam:artist'))),
'details': f'{str(self.interface_metadata.get('xesam:album', ''))} - {str(self.interface_metadata.get('xesam:title'))}',
# 'elapsed': time.time() - int(self.interface_details.get('Position')/1000000),
# 'duration': int(self.interface_metadata.get('mpris:length')/1000000),
}
if not self.plex_last or\
(self.plex_np.get('name') == self.plex_last.get('name') and self.plex_np.get('details') == self.plex_last.get('details')):
if original_art:
try:
uploaded = self.catbox.upload(original_art.split("file://", maxsplit=1)[1])
if uploaded:
self.plex_np['art'] = uploaded
else:
self.plex_np['art'] = None
except:
self.plex_np['art'] = None
else:
self.plex_np['art'] = None
self.plex_
return self.plex_np
except:
self.plex_np = {
'name': None,
'details': None,
'art': None,
'elapsed': None,
'duration': None,
}
print(traceback.format_exc())
return False
class DiscordPresence:
def __init__(self):
self.dbus = DBus()
self.last_updated = None
self.last_track = None
self.client_id = discord_presence_priv.CLIENT_ID
self.client_id_plex = discord_presence_priv.CLIENT_ID_PLEX
self.api_key = discord_presence_priv.API_KEY
self.api_url = 'https://api.codey.lol'
self.album_art_url = 'https://codey.lol/images/radio_art.jpg'
self.api_req_data = {
'bid': 0,
'cmd': 'radio_metadata',
'key': f'Bearer {self.api_key}',
}
def loop(self):
try:
lyr_start_idx = 0
lyr_end_idx = 1
rand_msg = None
lyrics_content = None
lyrics_current_iteration = None
chosen_image = random.choice([
{
'image': "qu",
'label': "quietscheentchen",
},
# {
# 'image': "rooster",
# 'label': ":3",
# }
])
RPC = Presence(self.client_id, pipe=0)
RPC.connect()
while True:
try:
now = time.time()
plex = self.dbus.is_plexing()
if plex:
plex_current = self.dbus.plex_np
plex_new = self.dbus.get_now_playing()
if plex_current == plex_new:
continue
if self.last_updated and (now - self.last_updated < 5):
time.sleep(0.7)
continue
RPC.client_id = self.client_id
# if RPC.client_id != self.client_id:
# try:
# RPC.close()
# except:
# pass
# RPC = Presence(self.client_id, pipe=0)
# RPC.connect()
print(RPC.update(
details=plex_new.get('name'),
state=plex_new.get('details'),
large_image=plex_new.get('art', 'https://www.plex.tv/wp-content/themes/plex/assets/img/favicons/plex-192.png'),
# large_image="https://codey.lol/images/cat.png",
# large_text=f"{rand_msg}" if rand_msg else None,k
large_text=lyrics_current_iteration if lyrics_current_iteration else None,
small_image=f"https://codey.lol/images/{chosen_image.get('image')}.png",
small_text=chosen_image.get('label'),
activity_type=ActivityType.WATCHING,
# start=plex_new.get('elapsed'),
# end=plex_new.get('duration')/1000,
))
time.sleep(1)
continue
request = requests.post(f'{self.api_url}/xc/', json=self.api_req_data,
headers={
'content-type': 'application/json; charset=utf-8'
}, timeout=(2, 10))
request.raise_for_status()
data = request.json().get('response')
track_artist = data.get('artist')
track_title = data.get('title')
start_time = data.get('start')
end_time = data.get('end')
track = f"{data.get('artist')} - {data.get('title')}"
if self.last_updated and (now - self.last_updated < 5):
time.sleep(0.7)
continue
if self.last_track and self.last_track == track:
lyr_start_idx = lyr_end_idx
lyr_end_idx += 1
elif self.last_track:
lyr_start_idx = 0
lyr_end_idx = 1
try:
# rand_msg_request = requests.post(f'{self.api_url}/randmsg/',
# headers={
# 'content-type': 'application/json; charset=utf-8'
# }, json={
# 'short': True,
# }, timeout=(1, 2))
# rand_msg = rand_msg_request.json().get('msg').strip()
# rand_msg = re.sub(r'(<b>|</b>)', '', re.sub(r'(<br>|<br/>|<br />|\n|\r\n)', ' ', rand_msg))
# if len(rand_msg) > 126:
# rand_msg = None
if not self.last_track or not self.last_track == track:
lyrics = requests.post(f'{self.api_url}/lyric_search/',
headers={
'content-type': 'application/json; charset=utf-8',
},
json={
'a': track_artist,
's': track_title,
'src': 'WEB-RADIO',
}, timeout=(2,10))
lyrics.raise_for_status()
lyrics_content = lyrics.json().get("lyrics")
lyrics_current_iteration = " / ".join(lyrics_content.split("<br>")[lyr_start_idx:lyr_end_idx]).strip()
if not lyrics_current_iteration.strip():
lyr_start_idx = 0
lyr_end_idx = 1
if len(lyrics_current_iteration) > 128:
lyrics_current_iteration = lyrics_current_iteration.split(" / ", maxsplit=1)[0].strip()
except:
try:
lyrics_current_iteration = " / ".join(lyrics_content.split("<br>")[0]).strip()
except:
lyrics_content = lyrics_current_iteration = None
print("FAILED TO GET RANDMSG/LYRICS")
print(traceback.format_exc())
self.last_updated = now
self.last_track = track
print(RPC.update(
details=track_title,
state=track_artist,
large_image=f"{self.album_art_url}?t={now}",
# large_image="https://codey.lol/images/cat.png",
# large_text=f"{rand_msg}" if rand_msg else None,k
large_text=lyrics_current_iteration if lyrics_current_iteration else None,
small_image=f"https://codey.lol/images/{chosen_image.get('image')}.png",
small_text=chosen_image.get('label'),
start=start_time,
end=end_time,
activity_type=ActivityType.LISTENING,
buttons=[
{
"label": "Listen",
"url": "https://codey.lol/radio"
},]
))
# match chosen_image.get('image'):
# case "rooster":
# chosen_image = {
# 'image': "qu",
# 'label': "rubber duck :3"
# }
# case "qu":
# chosen_image = {
# 'image': "rooster",
# 'label': ":3",
# }
except:
print(traceback.format_exc())
time.sleep(5)
continue
except:
print(traceback.format_exc())
try:
self.loop()
except:
pass
def __init__():
discord_presence = DiscordPresence()
discord_presence.loop()
if __name__ == "__main__":
__init__()