#!/usr/bin/env python3.13 import time import traceback import requests import dbus import re import sys import os import random import discord_presence_priv import catbox import logging from ff_brotab import query_ff from setproctitle import setproctitle from typing import Optional from pypresence import Presence, ActivityType setproctitle("disc-presence") class ImageUpload: def __init__(self) -> None: self.upload_uri = "https://api.codey.lol/misc/upload_activity_image" def upload(self, file: bytes) -> None: try: r = requests.post(self.upload_uri, files={"image": file}) print(f"Response {r.status_code}:\n{r.text}") r.raise_for_status() except Exception as e: print(f"Exception: {str(e)}") class DBus: def __init__(self) -> None: self.session_bus = dbus.SessionBus() self.catbox = catbox.Catbox() self.uploader = ImageUpload() try: self.player_dbus_proxy = self.session_bus.get_object( "org.mpris.MediaPlayer2.playerctld", "/org/mpris/MediaPlayer2" ) self.player_interface = dbus.Interface( self.player_dbus_proxy, "org.freedesktop.DBus.Properties" ) self.interface_metadata = None self.plex_last = None self.plex_np = { "name": None, "details": None, "art": None, "elapsed": None, "duration": None, } except Exception as e: logging.debug("Exception: %s", str(e)) def is_plexing(self) -> bool: try: self.interface_metadata = self.player_interface.GetAll( "org.mpris.MediaPlayer2.Player" ).get("Metadata") if self.interface_metadata and "app.plex.tv" in self.interface_metadata.get( "xesam:url" ): return True logging.debug("Plex not found") return False except Exception as e: logging.debug("Exception: %s", str(e)) traceback.print_exc() return False def get_now_playing(self): try: self.interface_details = self.player_interface.GetAll( "org.mpris.MediaPlayer2.Player" ) self.interface_metadata = self.interface_details.get("Metadata") original_art = str(self.interface_metadata.get("mpris:artUrl", "")) self.plex_np = { "name": str(" ".join(self.interface_metadata.get("xesam:artist"))), "details": f"{str(self.interface_metadata.get('xesam:album', ''))} - {str(self.interface_metadata.get('xesam:title'))}", # 'elapsed': time.time() - int(self.interface_details.get('Position')/1000000), # 'duration': int(self.interface_metadata.get('mpris:length')/1000000), } if not self.interface_metadata.get("xesam:album"): self.plex_np["details"] = self.plex_np["details"][ 2: ] # Fix for "'- ' prefix" if not self.plex_last or ( self.plex_np.get("name") == self.plex_last.get("name") and self.plex_np.get("details") == self.plex_last.get("details") ): if original_art: original_art = original_art.split("file://", maxsplit=1)[1] bytes_art = None try: with open(original_art, "rb") as f: bytes_art = f.read() if isinstance(bytes_art, bytes): # self.uploader.upload(bytes_art) self.plex_np["art"] = ( "https://api.codey.lol/misc/get_activity_image" ) else: print(f"bytes art: {type(bytes_art)}") self.plex_np["art"] = None except: traceback.print_exc() self.plex_np["art"] = None else: # print("No original art for this file") self.plex_np["art"] = None self.plex_last = self.plex_np return self.plex_np except: self.plex_np = { "name": None, "details": None, "art": None, "elapsed": None, "duration": None, } traceback.print_exc() return False class DiscordPresence: def __init__(self): self.dbus = DBus() self.radio_station = None self.last_updated = None self.last_track = None self.last_track_title = None self.client_id = discord_presence_priv.CLIENT_ID self.client_id_plex = discord_presence_priv.CLIENT_ID_PLEX self.api_key = discord_presence_priv.API_KEY self.api_url = "https://api.codey.lol" self.album_art_url = "https://api.codey.lol/radio/album_art" self.api_req_data = { "bid": 0, "cmd": "radio_metadata", "key": f"Bearer {self.api_key}", } self.last_uuid: Optional[str] = None def loop(self): try: lyr_start_idx = 0 lyr_end_idx = 1 rand_msg = None lyrics_content = None lyrics_current_iteration = None chosen_image = random.choice( [ { "image": "qu", "label": "quietscheentchen", }, # { # 'image': "rooster", # 'label': ":3", # } ] ) RPC = Presence(self.client_id, pipe=0) RPC.connect() while True: try: now = time.time() plex = self.dbus.is_plexing() if plex: plex_current = self.dbus.plex_np plex_new = self.dbus.get_now_playing() if plex_current == plex_new: continue if self.last_updated and (now - self.last_updated < 15): time.sleep(0.5) continue RPC.client_id = self.client_id # if RPC.client_id != self.client_id: # try:5 # RPC.close() # except: # pass # RPC = Presence(self.client_id, pipe=0) # RPC.connect() print( RPC.update( details=( plex_new.get("name", "\u2064") if plex_new.get("name") else plex_new.get("details") ), state=( plex_new.get("details", None) if plex_new.get("details") else None ), large_image=plex_new.get( "art", "https://www.plex.tv/wp-content/themes/plex/assets/img/favicons/plex-192.png", ), # large_image="https://codey.lol/images/cat.png", # large_text=f"{rand_msg}" if rand_msg else None,k large_text=( lyrics_current_iteration if lyrics_current_iteration else None ), # small_image=f"https://codey.lol/images/{chosen_image.get('image')}.png", # small_text=chosen_image.get('label'), activity_type=ActivityType.WATCHING, # start=plex_new.get('elapsed'), # end=plex_new.get('duration')/1000, ) ) time.sleep(1) continue ff_brotab = query_ff() if ff_brotab: (station, _track) = ff_brotab self.radio_station = station if self.last_track_title == _track.strip(): continue else: self.last_track_title = _track.strip() request = requests.post( f"{self.api_url}/radio/np?station={self.radio_station}", headers={"content-type": "application/json; charset=utf-8"}, timeout=(2, 10), ) request.raise_for_status() data = request.json() track = data.get("artistsong") track_artist = data.get("artist")[0:127] track_title = data.get("song")[0:127] track_album = data.get("album", "")[0:127] track_genre = data.get("genre", "") track_id = data.get("id") track_uuid = data.get("uuid") start_time = data.get("start") end_time = data.get("end") if self.last_uuid == track_uuid: print("Track has not changed") continue self.last_updated = now self.last_track = track self.last_uuid = track_uuid lyrics_current_iteration = None # disable print( RPC.update( details=track_title, state=f"{track_artist[0:100]} [{track_genre}]"[0:127], large_image=f"{self.album_art_url}?t={now}&track_id={track_id}", # large_image="https://codey.lol/images/cat.png", # large_text=f"{rand_msg}" if rand_msg else None,k large_text=track_album if track_album else None, small_image=f"https://codey.lol/images/{chosen_image.get('image')}.png", small_text=chosen_image.get("label"), start=start_time, end=end_time, activity_type=ActivityType.LISTENING, buttons=[ {"label": "Listen", "url": "https://codey.lol/radio"}, ], ) ) except Exception as e: logging.debug("Exception: %s", str(e)) traceback.print_exc() time.sleep(0.7) continue except Exception as e: logging.debug("Exception: %s", str(e)) traceback.print_exc() try: self.loop() except Exception as e: logging.debug("Exception: %s", str(e)) pass def __init__(): discord_presence = DiscordPresence() discord_presence.loop() if __name__ == "__main__": __init__()