#!/usr/bin/env python3.12 import time import traceback import requests import dbus import re import random import discord_presence_priv import catbox from setproctitle import setproctitle from typing import Optional from pypresence import Presence, ActivityType setproctitle("disc-presence") class ImageUpload: def __init__(self) -> None: self.upload_uri = "https://api.codey.lol/misc/upload_activity_image" def upload(self, file: bytes) -> None: try: r = requests.post(self.upload_uri, files={"image": file}) print(f"Response {r.status_code}:\n{r.text}") r.raise_for_status() except Exception as e: print(f"Exception: {str(e)}") class DBus: def __init__(self) -> None: self.session_bus = dbus.SessionBus() self.catbox = catbox.Catbox() self.uploader = ImageUpload() try: self.player_dbus_proxy = self.session_bus.get_object( "org.mpris.MediaPlayer2.playerctld", "/org/mpris/MediaPlayer2" ) self.player_interface = dbus.Interface( self.player_dbus_proxy, "org.freedesktop.DBus.Properties" ) self.interface_metadata = None self.plex_last = None self.plex_np = { "name": None, "details": None, "art": None, "elapsed": None, "duration": None, } except: pass def is_plexing(self) -> bool: try: self.interface_metadata = self.player_interface.GetAll( "org.mpris.MediaPlayer2.Player" ).get("Metadata") if "app.plex.tv" in self.interface_metadata.get("xesam:url"): return True print(f"WTF, NO PLEX?") except: print(traceback.format_exc()) return False def get_now_playing(self): try: self.interface_details = self.player_interface.GetAll( "org.mpris.MediaPlayer2.Player" ) self.interface_metadata = self.interface_details.get("Metadata") original_art = str(self.interface_metadata.get("mpris:artUrl", "")) self.plex_np = { "name": str(" ".join(self.interface_metadata.get("xesam:artist"))), "details": f"{str(self.interface_metadata.get('xesam:album', ''))} - {str(self.interface_metadata.get('xesam:title'))}", # 'elapsed': time.time() - int(self.interface_details.get('Position')/1000000), # 'duration': int(self.interface_metadata.get('mpris:length')/1000000), } if not self.interface_metadata.get("xesam:album"): self.plex_np["details"] = self.plex_np["details"][ 2: ] # Fix for "'- ' prefix" if not self.plex_last or ( self.plex_np.get("name") == self.plex_last.get("name") and self.plex_np.get("details") == self.plex_last.get("details") ): if original_art: original_art = original_art.split("file://", maxsplit=1)[1] bytes_art = None try: with open(original_art, "rb") as f: bytes_art = f.read() if isinstance(bytes_art, bytes): # self.uploader.upload(bytes_art) self.plex_np["art"] = ( "https://api.codey.lol/misc/get_activity_image" ) else: print(f"bytes art: {type(bytes_art)}") self.plex_np["art"] = None except: traceback.print_exc() self.plex_np["art"] = None else: # print("No original art for this file") self.plex_np["art"] = None self.plex_last = self.plex_np return self.plex_np except: self.plex_np = { "name": None, "details": None, "art": None, "elapsed": None, "duration": None, } print(traceback.format_exc()) return False class DiscordPresence: def __init__(self): self.dbus = DBus() self.last_updated = None self.last_track = None self.client_id = discord_presence_priv.CLIENT_ID self.client_id_plex = discord_presence_priv.CLIENT_ID_PLEX self.api_key = discord_presence_priv.API_KEY self.api_url = "https://api.codey.lol" self.album_art_url = "https://api.codey.lol/radio/album_art" self.api_req_data = { "bid": 0, "cmd": "radio_metadata", "key": f"Bearer {self.api_key}", } self.last_uuid: Optional[str] = None def loop(self): try: lyr_start_idx = 0 lyr_end_idx = 1 rand_msg = None lyrics_content = None lyrics_current_iteration = None chosen_image = random.choice( [ { "image": "qu", "label": "quietscheentchen", }, # { # 'image': "rooster", # 'label': ":3", # } ] ) RPC = Presence(self.client_id, pipe=0) RPC.connect() while True: try: now = time.time() plex = self.dbus.is_plexing() if plex: plex_current = self.dbus.plex_np plex_new = self.dbus.get_now_playing() if plex_current == plex_new: continue if self.last_updated and (now - self.last_updated < 15): time.sleep(0.5) continue RPC.client_id = self.client_id # if RPC.client_id != self.client_id: # try:5 # RPC.close() # except: # pass # RPC = Presence(self.client_id, pipe=0) # RPC.connect() print( RPC.update( details=( plex_new.get("name", "\u2064") if plex_new.get("name") else plex_new.get("details") ), state=( plex_new.get("details", None) if plex_new.get("details") else None ), large_image=plex_new.get( "art", "https://www.plex.tv/wp-content/themes/plex/assets/img/favicons/plex-192.png", ), # large_image="https://codey.lol/images/cat.png", # large_text=f"{rand_msg}" if rand_msg else None,k large_text=( lyrics_current_iteration if lyrics_current_iteration else None ), # small_image=f"https://codey.lol/images/{chosen_image.get('image')}.png", # small_text=chosen_image.get('label'), activity_type=ActivityType.WATCHING, # start=plex_new.get('elapsed'), # end=plex_new.get('duration')/1000, ) ) time.sleep(1) continue request = requests.post( f"{self.api_url}/radio/np", headers={"content-type": "application/json; charset=utf-8"}, timeout=(2, 10), ) request.raise_for_status() data = request.json() track = data.get("artistsong") track_artist = data.get("artist")[0:127] track_title = data.get("song")[0:127] track_album = data.get("album", "")[0:127] track_genre = data.get("genre", "") track_id = data.get("id") track_uuid = data.get("uuid") start_time = data.get("start") end_time = data.get("end") if self.last_uuid == track_uuid: continue # if self.last_updated and (now - self.last_updated < 15): # time.sleep(0.7) # continue # if self.last_track and self.last_track == track: # lyr_start_idx = lyr_end_idx # lyr_end_idx += 1 # elif self.last_track: # lyr_start_idx = 0 # lyr_end_idx = 1 # try: # # rand_msg_request = requests.post(f'{self.api_url}/randmsg/', # # headers={ # # 'content-type': 'application/json; charset=utf-8' # # }, json={ # # 'short': True, # # }, timeout=(1, 2)) # # rand_msg = rand_msg_request.json().get('msg').strip() # # rand_msg = re.sub(r'(|)', '', re.sub(r'(
|
|
|\n|\r\n)', ' ', rand_msg)) # # if len(rand_msg) > 126: # # rand_msg = None # if not self.last_track or not self.last_track == track: # lyrics = requests.post(f'{self.api_url}/lyric/search', # headers={ # 'content-type': 'application/json; charset=utf-8', # }, # json={ # 'a': track_artist, # 's': track_title, # 'src': 'WEB-RADIO', # }, timeout=(2,10)) # lyrics.raise_for_status() # lyrics_content = lyrics.json().get("lyrics") # lyrics_current_iteration = " / ".join(lyrics_content.split("
")[lyr_start_idx:lyr_end_idx]).strip() # if not lyrics_current_iteration.strip(): # lyr_start_idx = 0 # lyr_end_idx = 1 # if len(lyrics_current_iteration) > 128: # lyrics_current_iteration = lyrics_current_iteration.split(" / ", maxsplit=1)[0].strip() # except: # try: # lyrics_current_iteration = " / ".join(lyrics_content.split("
")[0]).strip() # except: # lyrics_content = lyrics_current_iteration = None # print("FAILED TO GET RANDMSG/LYRICS") # print(traceback.format_exc()) self.last_updated = now self.last_track = track self.last_uuid = track_uuid lyrics_current_iteration = None # disable print( RPC.update( details=track_title, state=f"{track_artist[0:100]} [{track_genre}]"[0:127], large_image=f"{self.album_art_url}?t={now}&track_id={track_id}", # large_image="https://codey.lol/images/cat.png", # large_text=f"{rand_msg}" if rand_msg else None,k large_text=track_album if track_album else None, small_image=f"https://codey.lol/images/{chosen_image.get('image')}.png", small_text=chosen_image.get("label"), start=start_time, end=end_time, activity_type=ActivityType.LISTENING, buttons=[ {"label": "Listen", "url": "https://codey.lol/radio"}, ], ) ) # match chosen_image.get('image'): # case "rooster": # chosen_image = { # 'image': "qu", # 'label': "rubber duck :3" # } # case "qu": # chosen_image = { # 'image': "rooster", # 'label': ":3", # } except: print(traceback.format_exc()) time.sleep(0.7) continue except: print(traceback.format_exc()) try: self.loop() except: pass def __init__(): discord_presence = DiscordPresence() discord_presence.loop() if __name__ == "__main__": __init__()