diff --git a/catbox.py b/catbox.py index 15504e8..cf88896 100644 --- a/catbox.py +++ b/catbox.py @@ -59,7 +59,7 @@ class Catbox(): 'userhash': '' } - r = requests.post(CATBOX_API_PATH, headers=HEADERS, proxies=self.PROXIES, data=postData, timeout=10, files={'fileToUpload': (self.generateRandomFileName(fileExt), fileContents)}) + r = requests.post(CATBOX_API_PATH, headers=HEADERS, proxies=self.PROXIES, data=postData, timeout=(2, 8), files={'fileToUpload': (self.generateRandomFileName(fileExt), fileContents)}) if r.status_code in [200, 301, 302]: return r.text.strip() diff --git a/discord_presence.py b/discord_presence.py index e0e57c3..c811acf 100644 --- a/discord_presence.py +++ b/discord_presence.py @@ -14,21 +14,37 @@ from pypresence import Presence, ActivityType setproctitle("disc-presence") +class ImageUpload: + def __init__(self) -> None: + self.upload_uri = "https://api.codey.lol/misc/upload_activity_image" + + def upload(self, file: bytes) -> None: + try: + r = requests.post(self.upload_uri, files= + {'image': file}) + print(f"Response {r.status_code}:\n{r.text}") + r.raise_for_status() + except Exception as e: + print(f"Exception: {str(e)}") + class DBus: def __init__(self) -> None: self.session_bus = dbus.SessionBus() self.catbox = catbox.Catbox() + self.uploader = ImageUpload() try: self.player_dbus_proxy = self.session_bus.get_object('org.mpris.MediaPlayer2.playerctld', '/org/mpris/MediaPlayer2') self.player_interface = dbus.Interface(self.player_dbus_proxy, 'org.freedesktop.DBus.Properties') - self.interface_details = None + self.interface_metadata = None self.plex_last = None self.plex_np = { 'name': None, 'details': None, 'art': None, + 'elapsed': None, + 'duration': None, } except: pass @@ -36,30 +52,56 @@ class DBus: def is_plexing(self) -> bool: try: - self.interface_details = self.player_interface.GetAll('org.mpris.MediaPlayer2.Player').get('Metadata') - if "app.plex.tv" in self.interface_details.get('xesam:url'): + self.interface_metadata = self.player_interface.GetAll('org.mpris.MediaPlayer2.Player').get('Metadata') + if "app.plex.tv" in self.interface_metadata.get('xesam:url'): return True + print(f"WTF, NO PLEX?") except: print(traceback.format_exc()) return False - def get_now_playing(self) -> dict | bool: + def get_now_playing(self): try: - self.interface_details = self.player_interface.GetAll('org.mpris.MediaPlayer2.Player').get('Metadata') - original_art = str(self.interface_details.get('mpris:artUrl', '')) + self.interface_details = self.player_interface.GetAll('org.mpris.MediaPlayer2.Player') + self.interface_metadata = self.interface_details.get('Metadata') + original_art = str(self.interface_metadata.get('mpris:artUrl', '')) self.plex_np = { - 'name': str(" ".join(self.interface_details.get('xesam:artist'))), - 'details': f'{str(self.interface_details.get('xesam:album', ''))} {str(self.interface_details.get('xesam:title'))}', + 'name': str(" ".join(self.interface_metadata.get('xesam:artist'))), + 'details': f'{str(self.interface_metadata.get('xesam:album', ''))} - {str(self.interface_metadata.get('xesam:title'))}', + # 'elapsed': time.time() - int(self.interface_details.get('Position')/1000000), + # 'duration': int(self.interface_metadata.get('mpris:length')/1000000), } + if not self.interface_metadata.get('xesam:album'): + self.plex_np['details'] = self.plex_np['details'][2:] # Fix for "'- ' prefix" if not self.plex_last or\ (self.plex_np.get('name') == self.plex_last.get('name') and self.plex_np.get('details') == self.plex_last.get('details')): - self.plex_np['art'] = self.catbox.upload(original_art.split("file://", maxsplit=1)[1]) + if original_art: + original_art = original_art.split("file://", maxsplit=1)[1] + bytes_art = None + try: + with open(original_art, 'rb') as f: + bytes_art = f.read() + if isinstance(bytes_art, bytes): + # self.uploader.upload(bytes_art) + self.plex_np['art'] = 'https://api.codey.lol/misc/get_activity_image' + else: + print(f"bytes art: {type(bytes_art)}") + self.plex_np['art'] = None + except: + traceback.print_exc() + self.plex_np['art'] = None + else: + # print("No original art for this file") + self.plex_np['art'] = None + self.plex_last = self.plex_np return self.plex_np except: self.plex_np = { 'name': None, 'details': None, 'art': None, + 'elapsed': None, + 'duration': None, } print(traceback.format_exc()) return False @@ -74,7 +116,7 @@ class DiscordPresence: self.client_id_plex = discord_presence_priv.CLIENT_ID_PLEX self.api_key = discord_presence_priv.API_KEY self.api_url = 'https://api.codey.lol' - self.album_art_url = 'https://codey.lol/images/radio_art.jpg' + self.album_art_url = 'https://api.codey.lol/radio/album_art' self.api_req_data = { 'bid': 0, 'cmd': 'radio_metadata', @@ -98,7 +140,7 @@ class DiscordPresence: # } ]) RPC = Presence(self.client_id, pipe=0) - RPC.connect() + RPC.connect() while True: try: now = time.time() @@ -110,120 +152,124 @@ class DiscordPresence: continue - if self.last_updated and (now - self.last_updated < 5): - time.sleep(0.7) + if self.last_updated and (now - self.last_updated < 15): + time.sleep(0.5) continue RPC.client_id = self.client_id # if RPC.client_id != self.client_id: - # try: + # try:5 # RPC.close() # except: # pass # RPC = Presence(self.client_id, pipe=0) # RPC.connect() - print(RPC.update( - details=plex_new.get('name'), - state=plex_new.get('details'), - large_image=plex_new.get('art'), + details=plex_new.get('name', '\u2064') if plex_new.get('name') else plex_new.get('details'), + state=plex_new.get('details', None) if plex_new.get('details') else None, + large_image=plex_new.get('art', 'https://www.plex.tv/wp-content/themes/plex/assets/img/favicons/plex-192.png'), # large_image="https://codey.lol/images/cat.png", # large_text=f"{rand_msg}" if rand_msg else None,k large_text=lyrics_current_iteration if lyrics_current_iteration else None, - small_image=f"https://codey.lol/images/{chosen_image.get('image')}.png", - small_text=chosen_image.get('label'), + # small_image=f"https://codey.lol/images/{chosen_image.get('image')}.png", + # small_text=chosen_image.get('label'), activity_type=ActivityType.WATCHING, - buttons=[ - { - "label": "Listen", - "url": "https://codey.lol/radio" - },] + # start=plex_new.get('elapsed'), + # end=plex_new.get('duration')/1000, )) - + time.sleep(1) continue - request = requests.post(f'{self.api_url}/xc/', json=self.api_req_data, + request = requests.post(f'{self.api_url}/radio/np', headers={ 'content-type': 'application/json; charset=utf-8' }, timeout=(2, 10)) request.raise_for_status() - data = request.json().get('response') - track_artist = data.get('artist') - track_title = data.get('title') + data = request.json() + track = data.get('artistsong') + track_artist = data.get('artist')[0:127] + track_title = data.get('song')[0:127] + track_album = data.get('album', '')[0:127] + track_genre = data.get('genre', '') + track_id = data.get('id') start_time = data.get('start') end_time = data.get('end') - track = f"{data.get('artist')} - {data.get('title')}" - if self.last_updated and (now - self.last_updated < 5): - time.sleep(0.7) + + if self.last_track == track: continue + + # if self.last_updated and (now - self.last_updated < 15): + # time.sleep(0.7) + # continue - if self.last_track and self.last_track == track: - lyr_start_idx = lyr_end_idx - lyr_end_idx += 1 - elif self.last_track: - lyr_start_idx = 0 - lyr_end_idx = 1 + # if self.last_track and self.last_track == track: + # lyr_start_idx = lyr_end_idx + # lyr_end_idx += 1 + # elif self.last_track: + # lyr_start_idx = 0 + # lyr_end_idx = 1 - try: - # rand_msg_request = requests.post(f'{self.api_url}/randmsg/', - # headers={ - # 'content-type': 'application/json; charset=utf-8' - # }, json={ - # 'short': True, - # }, timeout=(1, 2)) - # rand_msg = rand_msg_request.json().get('msg').strip() - # rand_msg = re.sub(r'(|)', '', re.sub(r'(
|
|
|\n|\r\n)', ' ', rand_msg)) - # if len(rand_msg) > 126: - # rand_msg = None + # try: + # # rand_msg_request = requests.post(f'{self.api_url}/randmsg/', + # # headers={ + # # 'content-type': 'application/json; charset=utf-8' + # # }, json={ + # # 'short': True, + # # }, timeout=(1, 2)) + # # rand_msg = rand_msg_request.json().get('msg').strip() + # # rand_msg = re.sub(r'(|)', '', re.sub(r'(
|
|
|\n|\r\n)', ' ', rand_msg)) + # # if len(rand_msg) > 126: + # # rand_msg = None - if not self.last_track or not self.last_track == track: - lyrics = requests.post(f'{self.api_url}/lyric_search/', - headers={ - 'content-type': 'application/json; charset=utf-8', - }, - json={ - 'a': track_artist, - 's': track_title, - 'src': 'WEB-RADIO', - }, timeout=(2,10)) - lyrics.raise_for_status() - lyrics_content = lyrics.json().get("lyrics") + # if not self.last_track or not self.last_track == track: + # lyrics = requests.post(f'{self.api_url}/lyric/search', + # headers={ + # 'content-type': 'application/json; charset=utf-8', + # }, + # json={ + # 'a': track_artist, + # 's': track_title, + # 'src': 'WEB-RADIO', + # }, timeout=(2,10)) + # lyrics.raise_for_status() + # lyrics_content = lyrics.json().get("lyrics") - lyrics_current_iteration = " / ".join(lyrics_content.split("
")[lyr_start_idx:lyr_end_idx]).strip() - if not lyrics_current_iteration.strip(): - lyr_start_idx = 0 - lyr_end_idx = 1 + # lyrics_current_iteration = " / ".join(lyrics_content.split("
")[lyr_start_idx:lyr_end_idx]).strip() + # if not lyrics_current_iteration.strip(): + # lyr_start_idx = 0 + # lyr_end_idx = 1 - if len(lyrics_current_iteration) > 128: - lyrics_current_iteration = lyrics_current_iteration.split(" / ", maxsplit=1)[0].strip() + # if len(lyrics_current_iteration) > 128: + # lyrics_current_iteration = lyrics_current_iteration.split(" / ", maxsplit=1)[0].strip() - except: - try: - lyrics_current_iteration = " / ".join(lyrics_content.split("
")[0]).strip() - except: - lyrics_content = lyrics_current_iteration = None - print("FAILED TO GET RANDMSG/LYRICS") - print(traceback.format_exc()) + # except: + # try: + # lyrics_current_iteration = " / ".join(lyrics_content.split("
")[0]).strip() + # except: + # lyrics_content = lyrics_current_iteration = None + # print("FAILED TO GET RANDMSG/LYRICS") + # print(traceback.format_exc()) self.last_updated = now self.last_track = track + lyrics_current_iteration = None # disable print(RPC.update( details=track_title, - state=track_artist, - large_image=f"{self.album_art_url}?t={now}", + state=f"{track_artist[0:100]} [{track_genre}]"[0:127], + large_image=f"{self.album_art_url}?t={now}&track_id={track_id}", # large_image="https://codey.lol/images/cat.png", # large_text=f"{rand_msg}" if rand_msg else None,k - large_text=lyrics_current_iteration if lyrics_current_iteration else None, - small_image=f"https://codey.lol/images/{chosen_image.get('image')}.png", - small_text=chosen_image.get('label'), + large_text=track_album if track_album else None, + # small_image=f"https://codey.lol/images/{chosen_image.get('image')}.png", + # small_text=chosen_image.get('label'), start=start_time, end=end_time, activity_type=ActivityType.LISTENING, @@ -247,7 +293,7 @@ class DiscordPresence: # } except: print(traceback.format_exc()) - time.sleep(5) + time.sleep(0.7) continue except: print(traceback.format_exc()) diff --git a/discord_presence.py.1 b/discord_presence.py.1 new file mode 100644 index 0000000..dea0a70 --- /dev/null +++ b/discord_presence.py.1 @@ -0,0 +1,284 @@ +#!/usr/bin/env python3.12 + +import time +import traceback +import requests +import dbus +import re +import random +import discord_presence_priv +import catbox +from setproctitle import setproctitle + +from pypresence import Presence, ActivityType + +setproctitle("disc-presence") + +class DBus: + def __init__(self) -> None: + self.session_bus = dbus.SessionBus() + self.catbox = catbox.Catbox() + try: + self.player_dbus_proxy = self.session_bus.get_object('org.mpris.MediaPlayer2.playerctld', + '/org/mpris/MediaPlayer2') + self.player_interface = dbus.Interface(self.player_dbus_proxy, + 'org.freedesktop.DBus.Properties') + self.interface_metadata = None + self.plex_last = None + self.plex_np = { + 'name': None, + 'details': None, + 'art': None, + 'elapsed': None, + 'duration': None, + } + except: + pass + + + def is_plexing(self) -> bool: + try: + self.interface_metadata = self.player_interface.GetAll('org.mpris.MediaPlayer2.Player').get('Metadata') + if "app.plex.tv" in self.interface_metadata.get('xesam:url'): + return True + print(f"WTF, NO PLEX?") + except: + print(traceback.format_exc()) + return False + + def get_now_playing(self) -> dict | bool: + try: + self.interface_details = self.player_interface.GetAll('org.mpris.MediaPlayer2.Player') + self.interface_metadata = self.interface_details.get('Metadata') + original_art = str(self.interface_metadata.get('mpris:artUrl', '')) + self.plex_np = { + 'name': str(" ".join(self.interface_metadata.get('xesam:artist'))), + 'details': f'{str(self.interface_metadata.get('xesam:album', ''))} - {str(self.interface_metadata.get('xesam:title'))}', + # 'elapsed': time.time() - int(self.interface_details.get('Position')/1000000), + # 'duration': int(self.interface_metadata.get('mpris:length')/1000000), + + } + if not self.plex_last or\ + (self.plex_np.get('name') == self.plex_last.get('name') and self.plex_np.get('details') == self.plex_last.get('details')): + if original_art: + try: + uploaded = self.catbox.upload(original_art.split("file://", maxsplit=1)[1]) + if uploaded: + self.plex_np['art'] = uploaded + else: + self.plex_np['art'] = None + except: + self.plex_np['art'] = None + else: + self.plex_np['art'] = None + self.plex_ + return self.plex_np + except: + self.plex_np = { + 'name': None, + 'details': None, + 'art': None, + 'elapsed': None, + 'duration': None, + } + print(traceback.format_exc()) + return False + + +class DiscordPresence: + def __init__(self): + self.dbus = DBus() + self.last_updated = None + self.last_track = None + self.client_id = discord_presence_priv.CLIENT_ID + self.client_id_plex = discord_presence_priv.CLIENT_ID_PLEX + self.api_key = discord_presence_priv.API_KEY + self.api_url = 'https://api.codey.lol' + self.album_art_url = 'https://codey.lol/images/radio_art.jpg' + self.api_req_data = { + 'bid': 0, + 'cmd': 'radio_metadata', + 'key': f'Bearer {self.api_key}', + } + def loop(self): + try: + lyr_start_idx = 0 + lyr_end_idx = 1 + rand_msg = None + lyrics_content = None + lyrics_current_iteration = None + chosen_image = random.choice([ + { + 'image': "qu", + 'label': "quietscheentchen", + }, + # { + # 'image': "rooster", + # 'label': ":3", + # } + ]) + RPC = Presence(self.client_id, pipe=0) + RPC.connect() + while True: + try: + now = time.time() + plex = self.dbus.is_plexing() + if plex: + plex_current = self.dbus.plex_np + plex_new = self.dbus.get_now_playing() + if plex_current == plex_new: + continue + + + if self.last_updated and (now - self.last_updated < 5): + time.sleep(0.7) + continue + + RPC.client_id = self.client_id + # if RPC.client_id != self.client_id: + # try: + # RPC.close() + # except: + # pass + # RPC = Presence(self.client_id, pipe=0) + # RPC.connect() + + print(RPC.update( + details=plex_new.get('name'), + state=plex_new.get('details'), + large_image=plex_new.get('art', 'https://www.plex.tv/wp-content/themes/plex/assets/img/favicons/plex-192.png'), + # large_image="https://codey.lol/images/cat.png", + # large_text=f"{rand_msg}" if rand_msg else None,k + large_text=lyrics_current_iteration if lyrics_current_iteration else None, + small_image=f"https://codey.lol/images/{chosen_image.get('image')}.png", + small_text=chosen_image.get('label'), + activity_type=ActivityType.WATCHING, + # start=plex_new.get('elapsed'), + # end=plex_new.get('duration')/1000, + )) + time.sleep(1) + continue + + + + request = requests.post(f'{self.api_url}/xc/', json=self.api_req_data, + headers={ + 'content-type': 'application/json; charset=utf-8' + }, timeout=(2, 10)) + request.raise_for_status() + data = request.json().get('response') + track_artist = data.get('artist') + track_title = data.get('title') + start_time = data.get('start') + end_time = data.get('end') + track = f"{data.get('artist')} - {data.get('title')}" + if self.last_updated and (now - self.last_updated < 5): + time.sleep(0.7) + continue + + if self.last_track and self.last_track == track: + lyr_start_idx = lyr_end_idx + lyr_end_idx += 1 + elif self.last_track: + lyr_start_idx = 0 + lyr_end_idx = 1 + + + + + try: + # rand_msg_request = requests.post(f'{self.api_url}/randmsg/', + # headers={ + # 'content-type': 'application/json; charset=utf-8' + # }, json={ + # 'short': True, + # }, timeout=(1, 2)) + # rand_msg = rand_msg_request.json().get('msg').strip() + # rand_msg = re.sub(r'(|)', '', re.sub(r'(
|
|
|\n|\r\n)', ' ', rand_msg)) + # if len(rand_msg) > 126: + # rand_msg = None + + if not self.last_track or not self.last_track == track: + lyrics = requests.post(f'{self.api_url}/lyric_search/', + headers={ + 'content-type': 'application/json; charset=utf-8', + }, + json={ + 'a': track_artist, + 's': track_title, + 'src': 'WEB-RADIO', + }, timeout=(2,10)) + lyrics.raise_for_status() + lyrics_content = lyrics.json().get("lyrics") + + lyrics_current_iteration = " / ".join(lyrics_content.split("
")[lyr_start_idx:lyr_end_idx]).strip() + if not lyrics_current_iteration.strip(): + lyr_start_idx = 0 + lyr_end_idx = 1 + + if len(lyrics_current_iteration) > 128: + lyrics_current_iteration = lyrics_current_iteration.split(" / ", maxsplit=1)[0].strip() + + + + except: + try: + lyrics_current_iteration = " / ".join(lyrics_content.split("
")[0]).strip() + except: + lyrics_content = lyrics_current_iteration = None + print("FAILED TO GET RANDMSG/LYRICS") + print(traceback.format_exc()) + + self.last_updated = now + self.last_track = track + + print(RPC.update( + details=track_title, + state=track_artist, + large_image=f"{self.album_art_url}?t={now}", + # large_image="https://codey.lol/images/cat.png", + # large_text=f"{rand_msg}" if rand_msg else None,k + large_text=lyrics_current_iteration if lyrics_current_iteration else None, + small_image=f"https://codey.lol/images/{chosen_image.get('image')}.png", + small_text=chosen_image.get('label'), + start=start_time, + end=end_time, + activity_type=ActivityType.LISTENING, + buttons=[ + { + "label": "Listen", + "url": "https://codey.lol/radio" + },] + )) + + # match chosen_image.get('image'): + # case "rooster": + # chosen_image = { + # 'image': "qu", + # 'label': "rubber duck :3" + # } + # case "qu": + # chosen_image = { + # 'image': "rooster", + # 'label': ":3", + # } + except: + print(traceback.format_exc()) + time.sleep(5) + continue + except: + print(traceback.format_exc()) + try: + self.loop() + except: + pass + + + +def __init__(): + discord_presence = DiscordPresence() + discord_presence.loop() + +if __name__ == "__main__": + __init__() + \ No newline at end of file