341 lines
14 KiB
Python
341 lines
14 KiB
Python
#!/usr/bin/env python3.12
|
|
|
|
import time
|
|
import traceback
|
|
import requests
|
|
import dbus
|
|
import re
|
|
import random
|
|
import discord_presence_priv
|
|
import catbox
|
|
from setproctitle import setproctitle
|
|
from typing import Optional
|
|
from pypresence import Presence, ActivityType
|
|
|
|
setproctitle("disc-presence")
|
|
|
|
|
|
class ImageUpload:
|
|
def __init__(self) -> None:
|
|
self.upload_uri = "https://api.codey.lol/misc/upload_activity_image"
|
|
|
|
def upload(self, file: bytes) -> None:
|
|
try:
|
|
r = requests.post(self.upload_uri, files={"image": file})
|
|
print(f"Response {r.status_code}:\n{r.text}")
|
|
r.raise_for_status()
|
|
except Exception as e:
|
|
print(f"Exception: {str(e)}")
|
|
|
|
|
|
class DBus:
|
|
def __init__(self) -> None:
|
|
self.session_bus = dbus.SessionBus()
|
|
self.catbox = catbox.Catbox()
|
|
self.uploader = ImageUpload()
|
|
try:
|
|
self.player_dbus_proxy = self.session_bus.get_object(
|
|
"org.mpris.MediaPlayer2.playerctld", "/org/mpris/MediaPlayer2"
|
|
)
|
|
self.player_interface = dbus.Interface(
|
|
self.player_dbus_proxy, "org.freedesktop.DBus.Properties"
|
|
)
|
|
self.interface_metadata = None
|
|
self.plex_last = None
|
|
self.plex_np = {
|
|
"name": None,
|
|
"details": None,
|
|
"art": None,
|
|
"elapsed": None,
|
|
"duration": None,
|
|
}
|
|
except:
|
|
pass
|
|
|
|
def is_plexing(self) -> bool:
|
|
try:
|
|
self.interface_metadata = self.player_interface.GetAll(
|
|
"org.mpris.MediaPlayer2.Player"
|
|
).get("Metadata")
|
|
if "app.plex.tv" in self.interface_metadata.get("xesam:url"):
|
|
return True
|
|
print(f"WTF, NO PLEX?")
|
|
except:
|
|
print(traceback.format_exc())
|
|
return False
|
|
|
|
def get_now_playing(self):
|
|
try:
|
|
self.interface_details = self.player_interface.GetAll(
|
|
"org.mpris.MediaPlayer2.Player"
|
|
)
|
|
self.interface_metadata = self.interface_details.get("Metadata")
|
|
original_art = str(self.interface_metadata.get("mpris:artUrl", ""))
|
|
self.plex_np = {
|
|
"name": str(" ".join(self.interface_metadata.get("xesam:artist"))),
|
|
"details": f"{str(self.interface_metadata.get('xesam:album', ''))} - {str(self.interface_metadata.get('xesam:title'))}",
|
|
# 'elapsed': time.time() - int(self.interface_details.get('Position')/1000000),
|
|
# 'duration': int(self.interface_metadata.get('mpris:length')/1000000),
|
|
}
|
|
if not self.interface_metadata.get("xesam:album"):
|
|
self.plex_np["details"] = self.plex_np["details"][
|
|
2:
|
|
] # Fix for "'- ' prefix"
|
|
if not self.plex_last or (
|
|
self.plex_np.get("name") == self.plex_last.get("name")
|
|
and self.plex_np.get("details") == self.plex_last.get("details")
|
|
):
|
|
if original_art:
|
|
original_art = original_art.split("file://", maxsplit=1)[1]
|
|
bytes_art = None
|
|
try:
|
|
with open(original_art, "rb") as f:
|
|
bytes_art = f.read()
|
|
if isinstance(bytes_art, bytes):
|
|
# self.uploader.upload(bytes_art)
|
|
self.plex_np["art"] = (
|
|
"https://api.codey.lol/misc/get_activity_image"
|
|
)
|
|
else:
|
|
print(f"bytes art: {type(bytes_art)}")
|
|
self.plex_np["art"] = None
|
|
except:
|
|
traceback.print_exc()
|
|
self.plex_np["art"] = None
|
|
else:
|
|
# print("No original art for this file")
|
|
self.plex_np["art"] = None
|
|
self.plex_last = self.plex_np
|
|
return self.plex_np
|
|
except:
|
|
self.plex_np = {
|
|
"name": None,
|
|
"details": None,
|
|
"art": None,
|
|
"elapsed": None,
|
|
"duration": None,
|
|
}
|
|
print(traceback.format_exc())
|
|
return False
|
|
|
|
|
|
class DiscordPresence:
|
|
def __init__(self):
|
|
self.dbus = DBus()
|
|
self.last_updated = None
|
|
self.last_track = None
|
|
self.client_id = discord_presence_priv.CLIENT_ID
|
|
self.client_id_plex = discord_presence_priv.CLIENT_ID_PLEX
|
|
self.api_key = discord_presence_priv.API_KEY
|
|
self.api_url = "https://api.codey.lol"
|
|
self.album_art_url = "https://api.codey.lol/radio/album_art"
|
|
self.api_req_data = {
|
|
"bid": 0,
|
|
"cmd": "radio_metadata",
|
|
"key": f"Bearer {self.api_key}",
|
|
}
|
|
self.last_uuid: Optional[str] = None
|
|
|
|
def loop(self):
|
|
try:
|
|
lyr_start_idx = 0
|
|
lyr_end_idx = 1
|
|
rand_msg = None
|
|
lyrics_content = None
|
|
lyrics_current_iteration = None
|
|
chosen_image = random.choice(
|
|
[
|
|
{
|
|
"image": "qu",
|
|
"label": "quietscheentchen",
|
|
},
|
|
# {
|
|
# 'image': "rooster",
|
|
# 'label': ":3",
|
|
# }
|
|
]
|
|
)
|
|
RPC = Presence(self.client_id, pipe=0)
|
|
RPC.connect()
|
|
while True:
|
|
try:
|
|
now = time.time()
|
|
plex = self.dbus.is_plexing()
|
|
if plex:
|
|
plex_current = self.dbus.plex_np
|
|
plex_new = self.dbus.get_now_playing()
|
|
if plex_current == plex_new:
|
|
continue
|
|
|
|
if self.last_updated and (now - self.last_updated < 15):
|
|
time.sleep(0.5)
|
|
continue
|
|
|
|
RPC.client_id = self.client_id
|
|
# if RPC.client_id != self.client_id:
|
|
# try:5
|
|
# RPC.close()
|
|
# except:
|
|
# pass
|
|
# RPC = Presence(self.client_id, pipe=0)
|
|
# RPC.connect()
|
|
print(
|
|
RPC.update(
|
|
details=(
|
|
plex_new.get("name", "\u2064")
|
|
if plex_new.get("name")
|
|
else plex_new.get("details")
|
|
),
|
|
state=(
|
|
plex_new.get("details", None)
|
|
if plex_new.get("details")
|
|
else None
|
|
),
|
|
large_image=plex_new.get(
|
|
"art",
|
|
"https://www.plex.tv/wp-content/themes/plex/assets/img/favicons/plex-192.png",
|
|
),
|
|
# large_image="https://codey.lol/images/cat.png",
|
|
# large_text=f"{rand_msg}" if rand_msg else None,k
|
|
large_text=(
|
|
lyrics_current_iteration
|
|
if lyrics_current_iteration
|
|
else None
|
|
),
|
|
# small_image=f"https://codey.lol/images/{chosen_image.get('image')}.png",
|
|
# small_text=chosen_image.get('label'),
|
|
activity_type=ActivityType.WATCHING,
|
|
# start=plex_new.get('elapsed'),
|
|
# end=plex_new.get('duration')/1000,
|
|
)
|
|
)
|
|
time.sleep(1)
|
|
continue
|
|
|
|
request = requests.post(
|
|
f"{self.api_url}/radio/np",
|
|
headers={"content-type": "application/json; charset=utf-8"},
|
|
timeout=(2, 10),
|
|
)
|
|
request.raise_for_status()
|
|
data = request.json()
|
|
track = data.get("artistsong")
|
|
track_artist = data.get("artist")[0:127]
|
|
track_title = data.get("song")[0:127]
|
|
track_album = data.get("album", "")[0:127]
|
|
track_genre = data.get("genre", "")
|
|
track_id = data.get("id")
|
|
track_uuid = data.get("uuid")
|
|
start_time = data.get("start")
|
|
end_time = data.get("end")
|
|
|
|
if self.last_uuid == track_uuid:
|
|
continue
|
|
|
|
# if self.last_updated and (now - self.last_updated < 15):
|
|
# time.sleep(0.7)
|
|
# continue
|
|
|
|
# if self.last_track and self.last_track == track:
|
|
# lyr_start_idx = lyr_end_idx
|
|
# lyr_end_idx += 1
|
|
# elif self.last_track:
|
|
# lyr_start_idx = 0
|
|
# lyr_end_idx = 1
|
|
|
|
# try:
|
|
# # rand_msg_request = requests.post(f'{self.api_url}/randmsg/',
|
|
# # headers={
|
|
# # 'content-type': 'application/json; charset=utf-8'
|
|
# # }, json={
|
|
# # 'short': True,
|
|
# # }, timeout=(1, 2))
|
|
# # rand_msg = rand_msg_request.json().get('msg').strip()
|
|
# # rand_msg = re.sub(r'(<b>|</b>)', '', re.sub(r'(<br>|<br/>|<br />|\n|\r\n)', ' ', rand_msg))
|
|
# # if len(rand_msg) > 126:
|
|
# # rand_msg = None
|
|
|
|
# if not self.last_track or not self.last_track == track:
|
|
# lyrics = requests.post(f'{self.api_url}/lyric/search',
|
|
# headers={
|
|
# 'content-type': 'application/json; charset=utf-8',
|
|
# },
|
|
# json={
|
|
# 'a': track_artist,
|
|
# 's': track_title,
|
|
# 'src': 'WEB-RADIO',
|
|
# }, timeout=(2,10))
|
|
# lyrics.raise_for_status()
|
|
# lyrics_content = lyrics.json().get("lyrics")
|
|
|
|
# lyrics_current_iteration = " / ".join(lyrics_content.split("<br>")[lyr_start_idx:lyr_end_idx]).strip()
|
|
# if not lyrics_current_iteration.strip():
|
|
# lyr_start_idx = 0
|
|
# lyr_end_idx = 1
|
|
|
|
# if len(lyrics_current_iteration) > 128:
|
|
# lyrics_current_iteration = lyrics_current_iteration.split(" / ", maxsplit=1)[0].strip()
|
|
|
|
# except:
|
|
# try:
|
|
# lyrics_current_iteration = " / ".join(lyrics_content.split("<br>")[0]).strip()
|
|
# except:
|
|
# lyrics_content = lyrics_current_iteration = None
|
|
# print("FAILED TO GET RANDMSG/LYRICS")
|
|
# print(traceback.format_exc())
|
|
|
|
self.last_updated = now
|
|
self.last_track = track
|
|
self.last_uuid = track_uuid
|
|
lyrics_current_iteration = None # disable
|
|
|
|
print(
|
|
RPC.update(
|
|
details=track_title,
|
|
state=f"{track_artist[0:100]} [{track_genre}]"[0:127],
|
|
large_image=f"{self.album_art_url}?t={now}&track_id={track_id}",
|
|
# large_image="https://codey.lol/images/cat.png",
|
|
# large_text=f"{rand_msg}" if rand_msg else None,k
|
|
large_text=track_album if track_album else None,
|
|
small_image=f"https://codey.lol/images/{chosen_image.get('image')}.png",
|
|
small_text=chosen_image.get("label"),
|
|
start=start_time,
|
|
end=end_time,
|
|
activity_type=ActivityType.LISTENING,
|
|
buttons=[
|
|
{"label": "Listen", "url": "https://codey.lol/radio"},
|
|
],
|
|
)
|
|
)
|
|
|
|
# match chosen_image.get('image'):
|
|
# case "rooster":
|
|
# chosen_image = {
|
|
# 'image': "qu",
|
|
# 'label': "rubber duck :3"
|
|
# }
|
|
# case "qu":
|
|
# chosen_image = {
|
|
# 'image': "rooster",
|
|
# 'label': ":3",
|
|
# }
|
|
except:
|
|
print(traceback.format_exc())
|
|
time.sleep(0.7)
|
|
continue
|
|
except:
|
|
print(traceback.format_exc())
|
|
try:
|
|
self.loop()
|
|
except:
|
|
pass
|
|
|
|
|
|
def __init__():
|
|
discord_presence = DiscordPresence()
|
|
discord_presence.loop()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
__init__()
|