diff --git a/artist_genre_import.py b/artist_genre_import.py new file mode 100644 index 0000000..13e748c --- /dev/null +++ b/artist_genre_import.py @@ -0,0 +1,88 @@ +import os +import logging +import asyncio +import aiosqlite as sqlite3 + +csv_file_path: str = "artist_genre_imp.csv" # current directory +db_file_path: str = os.path.join( + "/usr/local/share", "sqlite_dbs", "artist_genre_map.db" +) +track_db_file_path: str = os.path.join( + "/usr/local/share", "sqlite_dbs", "track_file_map.db" +) +artist_genre: dict[str, str] = {} +logger = logging.getLogger() +logger.setLevel(logging.DEBUG) + + +async def process_csv(file_path: str) -> None: + """ + Load the CSV containing artist/genre pairs to memory, then to SQLite + Format: Artist,Genre + Allows for commas in artist name by splitting at final comma + """ + logging.info("Loading %s", file_path) + with open(file_path, "r", encoding="utf-8") as f: + lines = f.readlines() + logging.info("Read %s lines", len(lines)) + for line in lines: + split_line = line.strip().split(",") + genre = "".join(split_line[-1]).strip() + artist = ",".join(split_line[0:-1]).strip() + if artist in artist_genre: + continue # Already in dict, skip + artist_genre[artist] = genre + logging.info("Processing %s artist/genre pairs", len(artist_genre)) + async with sqlite3.connect(db_file_path, timeout=5) as _db: + for artist, genre in artist_genre.items(): + res = await _db.execute_insert( + "INSERT INTO artist_genre (artist, genre) VALUES(?, ?)", + ( + artist, + genre, + ), + ) + if not res: + logging.debug("Failed to insert %s", artist) + logging.debug("Inserted id# %s", res) + logging.info("Committing") + await _db.commit() + + +async def process_from_legacy(legacy_db_path: str) -> None: + """ + Load existing tagged artists from the db (track_file_map) + For migration to new db + """ + logging.info("SQLite: Connecting to %s and %s", db_file_path, legacy_db_path) + async with sqlite3.connect(db_file_path, timeout=5) as new_db: + logging.info("Connected to new db") + async with sqlite3.connect(track_db_file_path, timeout=30) as legacy_db: + logging.info("Connected to legacy db") + legacy_db.row_factory = sqlite3.Row + async with await legacy_db.execute( + "SELECT distinct(artist), genre FROM tracks WHERE genre != 'Untagged'" + ) as _cursor: + result = await _cursor.fetchall() + logging.info("Read %s items from legacy db", len(result)) + for result in result: + artist = result["artist"].strip() + genre = result["genre"].strip() + res = await new_db.execute_insert( + "INSERT OR IGNORE INTO artist_genre (artist, genre) VALUES(?, ?)", + (artist, genre), + ) + logging.info("Inserted id# %s", res) + logging.info("Committing to new_db") + await new_db.commit() + + +def __init__(): + loop = asyncio.new_event_loop() + # loop.create_task(process_csv(csv_file_path)) + loop.create_task(process_from_legacy(track_db_file_path)) + loop.run_forever() + + +if __name__ == "__main__": + __init__() diff --git a/catbox.py b/catbox.py index cf88896..e8a01cf 100644 --- a/catbox.py +++ b/catbox.py @@ -10,56 +10,65 @@ import json CATBOX_API_PATH = "https://catbox.moe/user/api.php" HEADERS = { - 'accept': '*/*', - 'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/101.0.4951.64 Safari/537.36 Edg/101.0.1210.53', - 'Accept-Language': 'en-US,en;q=0.9,it;q=0.8,es;q=0.7', - 'referer': 'https://www.google.com/', - 'cookie': 'DSID=AAO-7r4OSkS76zbHUkiOpnI0kk-X19BLDFF53G8gbnd21VZV2iehu-w_2v14cxvRvrkd_NjIdBWX7wUiQ66f-D8kOkTKD1BhLVlqrFAaqDP3LodRK2I0NfrObmhV9HsedGE7-mQeJpwJifSxdchqf524IMh9piBflGqP0Lg0_xjGmLKEQ0F4Na6THgC06VhtUG5infEdqMQ9otlJENe3PmOQTC_UeTH5DnENYwWC8KXs-M4fWmDADmG414V0_X0TfjrYu01nDH2Dcf3TIOFbRDb993g8nOCswLMi92LwjoqhYnFdf1jzgK0' - } + "accept": "*/*", + "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/101.0.4951.64 Safari/537.36 Edg/101.0.1210.53", + "Accept-Language": "en-US,en;q=0.9,it;q=0.8,es;q=0.7", + "referer": "https://www.google.com/", + "cookie": "DSID=AAO-7r4OSkS76zbHUkiOpnI0kk-X19BLDFF53G8gbnd21VZV2iehu-w_2v14cxvRvrkd_NjIdBWX7wUiQ66f-D8kOkTKD1BhLVlqrFAaqDP3LodRK2I0NfrObmhV9HsedGE7-mQeJpwJifSxdchqf524IMh9piBflGqP0Lg0_xjGmLKEQ0F4Na6THgC06VhtUG5infEdqMQ9otlJENe3PmOQTC_UeTH5DnENYwWC8KXs-M4fWmDADmG414V0_X0TfjrYu01nDH2Dcf3TIOFbRDb993g8nOCswLMi92LwjoqhYnFdf1jzgK0", +} +class Catbox: -class Catbox(): - - def generateRandomFileName(self, fileExt=''): + def generateRandomFileName(self, fileExt=""): if len(fileExt) < 1: - fileExt = 'db' + fileExt = "db" return f"{random.getrandbits(32)}.{fileExt}" - + def __init__(self): self.PROXIES = None try: - with open(os.path.join(os.path.expanduser("~"), ".catprox"), 'r') as proxfile: + with open( + os.path.join(os.path.expanduser("~"), ".catprox"), "r" + ) as proxfile: self.PROXIES = json.loads("".join(str(x) for x in proxfile.readlines())) except: pass - - def upload(self, filePath=''): + def upload(self, filePath=""): global CATBOX_API_PATH global HEADERS try: if len(filePath) == 0: return False - - if not(os.path.exists(filePath)): + + if not (os.path.exists(filePath)): print(f"Could not find {filePath}") - fileExt = '' + fileExt = "" if filePath.find(".") > 0: fileExt = "".join(filePath.split(".")[-1:]) - - with open(filePath, 'rb') as fileContents: - postData = { - 'reqtype': 'fileupload', - 'userhash': '' - } - r = requests.post(CATBOX_API_PATH, headers=HEADERS, proxies=self.PROXIES, data=postData, timeout=(2, 8), files={'fileToUpload': (self.generateRandomFileName(fileExt), fileContents)}) + with open(filePath, "rb") as fileContents: + postData = {"reqtype": "fileupload", "userhash": ""} + + r = requests.post( + CATBOX_API_PATH, + headers=HEADERS, + proxies=self.PROXIES, + data=postData, + timeout=(2, 8), + files={ + "fileToUpload": ( + self.generateRandomFileName(fileExt), + fileContents, + ) + }, + ) if r.status_code in [200, 301, 302]: return r.text.strip() diff --git a/discord_presence.py b/discord_presence.py index c811acf..b1ede33 100644 --- a/discord_presence.py +++ b/discord_presence.py @@ -9,99 +9,111 @@ import random import discord_presence_priv import catbox from setproctitle import setproctitle - +from typing import Optional from pypresence import Presence, ActivityType setproctitle("disc-presence") + class ImageUpload: def __init__(self) -> None: self.upload_uri = "https://api.codey.lol/misc/upload_activity_image" - + def upload(self, file: bytes) -> None: try: - r = requests.post(self.upload_uri, files= - {'image': file}) + r = requests.post(self.upload_uri, files={"image": file}) print(f"Response {r.status_code}:\n{r.text}") r.raise_for_status() except Exception as e: print(f"Exception: {str(e)}") + class DBus: def __init__(self) -> None: - self.session_bus = dbus.SessionBus() + self.session_bus = dbus.SessionBus() self.catbox = catbox.Catbox() self.uploader = ImageUpload() try: - self.player_dbus_proxy = self.session_bus.get_object('org.mpris.MediaPlayer2.playerctld', - '/org/mpris/MediaPlayer2') - self.player_interface = dbus.Interface(self.player_dbus_proxy, - 'org.freedesktop.DBus.Properties') + self.player_dbus_proxy = self.session_bus.get_object( + "org.mpris.MediaPlayer2.playerctld", "/org/mpris/MediaPlayer2" + ) + self.player_interface = dbus.Interface( + self.player_dbus_proxy, "org.freedesktop.DBus.Properties" + ) self.interface_metadata = None self.plex_last = None self.plex_np = { - 'name': None, - 'details': None, - 'art': None, - 'elapsed': None, - 'duration': None, + "name": None, + "details": None, + "art": None, + "elapsed": None, + "duration": None, } except: pass - def is_plexing(self) -> bool: try: - self.interface_metadata = self.player_interface.GetAll('org.mpris.MediaPlayer2.Player').get('Metadata') - if "app.plex.tv" in self.interface_metadata.get('xesam:url'): + self.interface_metadata = self.player_interface.GetAll( + "org.mpris.MediaPlayer2.Player" + ).get("Metadata") + if "app.plex.tv" in self.interface_metadata.get("xesam:url"): return True print(f"WTF, NO PLEX?") except: print(traceback.format_exc()) return False - + def get_now_playing(self): try: - self.interface_details = self.player_interface.GetAll('org.mpris.MediaPlayer2.Player') - self.interface_metadata = self.interface_details.get('Metadata') - original_art = str(self.interface_metadata.get('mpris:artUrl', '')) + self.interface_details = self.player_interface.GetAll( + "org.mpris.MediaPlayer2.Player" + ) + self.interface_metadata = self.interface_details.get("Metadata") + original_art = str(self.interface_metadata.get("mpris:artUrl", "")) self.plex_np = { - 'name': str(" ".join(self.interface_metadata.get('xesam:artist'))), - 'details': f'{str(self.interface_metadata.get('xesam:album', ''))} - {str(self.interface_metadata.get('xesam:title'))}', + "name": str(" ".join(self.interface_metadata.get("xesam:artist"))), + "details": f"{str(self.interface_metadata.get('xesam:album', ''))} - {str(self.interface_metadata.get('xesam:title'))}", # 'elapsed': time.time() - int(self.interface_details.get('Position')/1000000), # 'duration': int(self.interface_metadata.get('mpris:length')/1000000), } - if not self.interface_metadata.get('xesam:album'): - self.plex_np['details'] = self.plex_np['details'][2:] # Fix for "'- ' prefix" - if not self.plex_last or\ - (self.plex_np.get('name') == self.plex_last.get('name') and self.plex_np.get('details') == self.plex_last.get('details')): + if not self.interface_metadata.get("xesam:album"): + self.plex_np["details"] = self.plex_np["details"][ + 2: + ] # Fix for "'- ' prefix" + if not self.plex_last or ( + self.plex_np.get("name") == self.plex_last.get("name") + and self.plex_np.get("details") == self.plex_last.get("details") + ): if original_art: original_art = original_art.split("file://", maxsplit=1)[1] bytes_art = None try: - with open(original_art, 'rb') as f: + with open(original_art, "rb") as f: bytes_art = f.read() if isinstance(bytes_art, bytes): # self.uploader.upload(bytes_art) - self.plex_np['art'] = 'https://api.codey.lol/misc/get_activity_image' + self.plex_np["art"] = ( + "https://api.codey.lol/misc/get_activity_image" + ) else: print(f"bytes art: {type(bytes_art)}") - self.plex_np['art'] = None + self.plex_np["art"] = None except: traceback.print_exc() - self.plex_np['art'] = None + self.plex_np["art"] = None else: # print("No original art for this file") - self.plex_np['art'] = None - self.plex_last = self.plex_np + self.plex_np["art"] = None + self.plex_last = self.plex_np return self.plex_np except: self.plex_np = { - 'name': None, - 'details': None, - 'art': None, - 'elapsed': None, - 'duration': None, + "name": None, + "details": None, + "art": None, + "elapsed": None, + "duration": None, } print(traceback.format_exc()) return False @@ -115,31 +127,35 @@ class DiscordPresence: self.client_id = discord_presence_priv.CLIENT_ID self.client_id_plex = discord_presence_priv.CLIENT_ID_PLEX self.api_key = discord_presence_priv.API_KEY - self.api_url = 'https://api.codey.lol' - self.album_art_url = 'https://api.codey.lol/radio/album_art' + self.api_url = "https://api.codey.lol" + self.album_art_url = "https://api.codey.lol/radio/album_art" self.api_req_data = { - 'bid': 0, - 'cmd': 'radio_metadata', - 'key': f'Bearer {self.api_key}', + "bid": 0, + "cmd": "radio_metadata", + "key": f"Bearer {self.api_key}", } + self.last_uuid: Optional[str] = None + def loop(self): try: lyr_start_idx = 0 - lyr_end_idx = 1 + lyr_end_idx = 1 rand_msg = None lyrics_content = None lyrics_current_iteration = None - chosen_image = random.choice([ - { - 'image': "qu", - 'label': "quietscheentchen", - }, - # { - # 'image': "rooster", - # 'label': ":3", - # } - ]) - RPC = Presence(self.client_id, pipe=0) + chosen_image = random.choice( + [ + { + "image": "qu", + "label": "quietscheentchen", + }, + # { + # 'image': "rooster", + # 'label': ":3", + # } + ] + ) + RPC = Presence(self.client_id, pipe=0) RPC.connect() while True: try: @@ -151,11 +167,10 @@ class DiscordPresence: if plex_current == plex_new: continue - if self.last_updated and (now - self.last_updated < 15): time.sleep(0.5) continue - + RPC.client_id = self.client_id # if RPC.client_id != self.client_id: # try:5 @@ -163,43 +178,60 @@ class DiscordPresence: # except: # pass # RPC = Presence(self.client_id, pipe=0) - # RPC.connect() - print(RPC.update( - details=plex_new.get('name', '\u2064') if plex_new.get('name') else plex_new.get('details'), - state=plex_new.get('details', None) if plex_new.get('details') else None, - large_image=plex_new.get('art', 'https://www.plex.tv/wp-content/themes/plex/assets/img/favicons/plex-192.png'), - # large_image="https://codey.lol/images/cat.png", - # large_text=f"{rand_msg}" if rand_msg else None,k - large_text=lyrics_current_iteration if lyrics_current_iteration else None, - # small_image=f"https://codey.lol/images/{chosen_image.get('image')}.png", - # small_text=chosen_image.get('label'), - activity_type=ActivityType.WATCHING, - # start=plex_new.get('elapsed'), - # end=plex_new.get('duration')/1000, - )) + # RPC.connect() + print( + RPC.update( + details=( + plex_new.get("name", "\u2064") + if plex_new.get("name") + else plex_new.get("details") + ), + state=( + plex_new.get("details", None) + if plex_new.get("details") + else None + ), + large_image=plex_new.get( + "art", + "https://www.plex.tv/wp-content/themes/plex/assets/img/favicons/plex-192.png", + ), + # large_image="https://codey.lol/images/cat.png", + # large_text=f"{rand_msg}" if rand_msg else None,k + large_text=( + lyrics_current_iteration + if lyrics_current_iteration + else None + ), + # small_image=f"https://codey.lol/images/{chosen_image.get('image')}.png", + # small_text=chosen_image.get('label'), + activity_type=ActivityType.WATCHING, + # start=plex_new.get('elapsed'), + # end=plex_new.get('duration')/1000, + ) + ) time.sleep(1) continue - - - request = requests.post(f'{self.api_url}/radio/np', - headers={ - 'content-type': 'application/json; charset=utf-8' - }, timeout=(2, 10)) + request = requests.post( + f"{self.api_url}/radio/np", + headers={"content-type": "application/json; charset=utf-8"}, + timeout=(2, 10), + ) request.raise_for_status() data = request.json() - track = data.get('artistsong') - track_artist = data.get('artist')[0:127] - track_title = data.get('song')[0:127] - track_album = data.get('album', '')[0:127] - track_genre = data.get('genre', '') - track_id = data.get('id') - start_time = data.get('start') - end_time = data.get('end') - - if self.last_track == track: + track = data.get("artistsong") + track_artist = data.get("artist")[0:127] + track_title = data.get("song")[0:127] + track_album = data.get("album", "")[0:127] + track_genre = data.get("genre", "") + track_id = data.get("id") + track_uuid = data.get("uuid") + start_time = data.get("start") + end_time = data.get("end") + + if self.last_uuid == track_uuid: continue - + # if self.last_updated and (now - self.last_updated < 15): # time.sleep(0.7) # continue @@ -211,9 +243,6 @@ class DiscordPresence: # lyr_start_idx = 0 # lyr_end_idx = 1 - - - # try: # # rand_msg_request = requests.post(f'{self.api_url}/randmsg/', # # headers={ @@ -247,8 +276,6 @@ class DiscordPresence: # if len(lyrics_current_iteration) > 128: # lyrics_current_iteration = lyrics_current_iteration.split(" / ", maxsplit=1)[0].strip() - - # except: # try: # lyrics_current_iteration = " / ".join(lyrics_content.split("
")[0]).strip() @@ -259,38 +286,39 @@ class DiscordPresence: self.last_updated = now self.last_track = track - lyrics_current_iteration = None # disable + self.last_uuid = track_uuid + lyrics_current_iteration = None # disable + + print( + RPC.update( + details=track_title, + state=f"{track_artist[0:100]} [{track_genre}]"[0:127], + large_image=f"{self.album_art_url}?t={now}&track_id={track_id}", + # large_image="https://codey.lol/images/cat.png", + # large_text=f"{rand_msg}" if rand_msg else None,k + large_text=track_album if track_album else None, + small_image=f"https://codey.lol/images/{chosen_image.get('image')}.png", + small_text=chosen_image.get("label"), + start=start_time, + end=end_time, + activity_type=ActivityType.LISTENING, + buttons=[ + {"label": "Listen", "url": "https://codey.lol/radio"}, + ], + ) + ) - print(RPC.update( - details=track_title, - state=f"{track_artist[0:100]} [{track_genre}]"[0:127], - large_image=f"{self.album_art_url}?t={now}&track_id={track_id}", - # large_image="https://codey.lol/images/cat.png", - # large_text=f"{rand_msg}" if rand_msg else None,k - large_text=track_album if track_album else None, - # small_image=f"https://codey.lol/images/{chosen_image.get('image')}.png", - # small_text=chosen_image.get('label'), - start=start_time, - end=end_time, - activity_type=ActivityType.LISTENING, - buttons=[ - { - "label": "Listen", - "url": "https://codey.lol/radio" - },] - )) - # match chosen_image.get('image'): - # case "rooster": - # chosen_image = { - # 'image': "qu", - # 'label': "rubber duck :3" - # } - # case "qu": - # chosen_image = { - # 'image': "rooster", - # 'label': ":3", - # } + # case "rooster": + # chosen_image = { + # 'image': "qu", + # 'label': "rubber duck :3" + # } + # case "qu": + # chosen_image = { + # 'image': "rooster", + # 'label': ":3", + # } except: print(traceback.format_exc()) time.sleep(0.7) @@ -303,11 +331,10 @@ class DiscordPresence: pass - def __init__(): discord_presence = DiscordPresence() - discord_presence.loop() + discord_presence.loop() + if __name__ == "__main__": __init__() - \ No newline at end of file