#!/usr/bin/env python3.12 import time import traceback import requests import dbus import re import random import discord_presence_priv import catbox from setproctitle import setproctitle from pypresence import Presence, ActivityType setproctitle("disc-presence") class DBus: def __init__(self) -> None: self.session_bus = dbus.SessionBus() self.catbox = catbox.Catbox() try: self.player_dbus_proxy = self.session_bus.get_object('org.mpris.MediaPlayer2.playerctld', '/org/mpris/MediaPlayer2') self.player_interface = dbus.Interface(self.player_dbus_proxy, 'org.freedesktop.DBus.Properties') self.interface_details = None self.plex_last = None self.plex_np = { 'name': None, 'details': None, 'art': None, } except: pass def is_plexing(self) -> bool: try: self.interface_details = self.player_interface.GetAll('org.mpris.MediaPlayer2.Player').get('Metadata') if "app.plex.tv" in self.interface_details.get('xesam:url'): return True except: print(traceback.format_exc()) return False def get_now_playing(self) -> dict | bool: try: self.interface_details = self.player_interface.GetAll('org.mpris.MediaPlayer2.Player').get('Metadata') original_art = str(self.interface_details.get('mpris:artUrl', '')) self.plex_np = { 'name': str(" ".join(self.interface_details.get('xesam:artist'))), 'details': f'{str(self.interface_details.get('xesam:album', ''))} {str(self.interface_details.get('xesam:title'))}', } if not self.plex_last or\ (self.plex_np.get('name') == self.plex_last.get('name') and self.plex_np.get('details') == self.plex_last.get('details')): self.plex_np['art'] = self.catbox.upload(original_art.split("file://", maxsplit=1)[1]) return self.plex_np except: self.plex_np = { 'name': None, 'details': None, 'art': None, } print(traceback.format_exc()) return False class DiscordPresence: def __init__(self): self.dbus = DBus() self.last_updated = None self.last_track = None self.client_id = discord_presence_priv.CLIENT_ID self.client_id_plex = discord_presence_priv.CLIENT_ID_PLEX self.api_key = discord_presence_priv.API_KEY self.api_url = 'https://api.codey.lol' self.album_art_url = 'https://codey.lol/images/radio_art.jpg' self.api_req_data = { 'bid': 0, 'cmd': 'radio_metadata', 'key': f'Bearer {self.api_key}', } def loop(self): try: lyr_start_idx = 0 lyr_end_idx = 1 rand_msg = None lyrics_content = None lyrics_current_iteration = None chosen_image = random.choice([ { 'image': "qu", 'label': "quietscheentchen", }, # { # 'image': "rooster", # 'label': ":3", # } ]) RPC = Presence(self.client_id, pipe=0) RPC.connect() while True: try: now = time.time() plex = self.dbus.is_plexing() if plex: plex_current = self.dbus.plex_np plex_new = self.dbus.get_now_playing() if plex_current == plex_new: continue if self.last_updated and (now - self.last_updated < 5): time.sleep(0.7) continue RPC.client_id = self.client_id # if RPC.client_id != self.client_id: # try: # RPC.close() # except: # pass # RPC = Presence(self.client_id, pipe=0) # RPC.connect() print(RPC.update( details=plex_new.get('name'), state=plex_new.get('details'), large_image=plex_new.get('art'), # large_image="https://codey.lol/images/cat.png", # large_text=f"{rand_msg}" if rand_msg else None,k large_text=lyrics_current_iteration if lyrics_current_iteration else None, small_image=f"https://codey.lol/images/{chosen_image.get('image')}.png", small_text=chosen_image.get('label'), activity_type=ActivityType.WATCHING, buttons=[ { "label": "Listen", "url": "https://codey.lol/radio" },] )) continue request = requests.post(f'{self.api_url}/xc/', json=self.api_req_data, headers={ 'content-type': 'application/json; charset=utf-8' }, timeout=(2, 10)) request.raise_for_status() data = request.json().get('response') track_artist = data.get('artist') track_title = data.get('title') start_time = data.get('start') end_time = data.get('end') track = f"{data.get('artist')} - {data.get('title')}" if self.last_updated and (now - self.last_updated < 5): time.sleep(0.7) continue if self.last_track and self.last_track == track: lyr_start_idx = lyr_end_idx lyr_end_idx += 1 elif self.last_track: lyr_start_idx = 0 lyr_end_idx = 1 try: # rand_msg_request = requests.post(f'{self.api_url}/randmsg/', # headers={ # 'content-type': 'application/json; charset=utf-8' # }, json={ # 'short': True, # }, timeout=(1, 2)) # rand_msg = rand_msg_request.json().get('msg').strip() # rand_msg = re.sub(r'(|)', '', re.sub(r'(
|
|
|\n|\r\n)', ' ', rand_msg)) # if len(rand_msg) > 126: # rand_msg = None if not self.last_track or not self.last_track == track: lyrics = requests.post(f'{self.api_url}/lyric_search/', headers={ 'content-type': 'application/json; charset=utf-8', }, json={ 'a': track_artist, 's': track_title, 'src': 'WEB-RADIO', }, timeout=(2,10)) lyrics.raise_for_status() lyrics_content = lyrics.json().get("lyrics") lyrics_current_iteration = " / ".join(lyrics_content.split("
")[lyr_start_idx:lyr_end_idx]).strip() if not lyrics_current_iteration.strip(): lyr_start_idx = 0 lyr_end_idx = 1 if len(lyrics_current_iteration) > 128: lyrics_current_iteration = lyrics_current_iteration.split(" / ", maxsplit=1)[0].strip() except: try: lyrics_current_iteration = " / ".join(lyrics_content.split("
")[0]).strip() except: lyrics_content = lyrics_current_iteration = None print("FAILED TO GET RANDMSG/LYRICS") print(traceback.format_exc()) self.last_updated = now self.last_track = track print(RPC.update( details=track_title, state=track_artist, large_image=f"{self.album_art_url}?t={now}", # large_image="https://codey.lol/images/cat.png", # large_text=f"{rand_msg}" if rand_msg else None,k large_text=lyrics_current_iteration if lyrics_current_iteration else None, small_image=f"https://codey.lol/images/{chosen_image.get('image')}.png", small_text=chosen_image.get('label'), start=start_time, end=end_time, activity_type=ActivityType.LISTENING, buttons=[ { "label": "Listen", "url": "https://codey.lol/radio" },] )) # match chosen_image.get('image'): # case "rooster": # chosen_image = { # 'image': "qu", # 'label': "rubber duck :3" # } # case "qu": # chosen_image = { # 'image': "rooster", # 'label': ":3", # } except: print(traceback.format_exc()) time.sleep(5) continue except: print(traceback.format_exc()) try: self.loop() except: pass def __init__(): discord_presence = DiscordPresence() discord_presence.loop() if __name__ == "__main__": __init__()