#!/usr/bin/env python3.12 """ Radio Utils """ import logging import traceback import time import datetime import gpt from aiohttp import ClientSession, ClientTimeout class RadioUtil: def __init__(self, constants) -> None: self.constants = constants self.gpt = gpt.GPT(self.constants) self.webhooks = { 'gpt': { 'hook': self.constants.GPT_WEBHOOK, }, 'sfm': { 'hook': self.constants.SFM_WEBHOOK, } } def duration_conv(self, s: int|float) -> str: """ Convert duration given in seconds to hours, minutes, and seconds (h:m:s) Args: s (int|float): seconds to convert Returns: str """ return str(datetime.timedelta(seconds=s)).split(".", maxsplit=1)[0] async def get_ai_song_info(self, artist: str, song: str) -> str|None: """ Get AI Song Info Args: artist (str) song (str) Returns: str|None """ response = await self.gpt.get_completion(prompt=f"I am going to listen to {song} by {artist}.") if not response: logging.critical("No response received from GPT?") return return response async def webhook_song_change(self, track: dict) -> None: try: """ Handles Song Change Outbounds (Webhooks) Args: track (dict) Returns: None """ # First, send track info friendly_track_start = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(track['start'])) friendly_track_end = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(track['end'])) hook_data = { 'username': 'serious.FM', "embeds": [{ "title": "Now Playing", "description": f'## {track['song']}\nby\n## {track['artist']}', "color": 0x30c56f, "thumbnail": { "url": f"https://api.codey.lol/radio/album_art?track_id={track['id']}&{time.time()}", }, "fields": [ { "name": "Duration", "value": self.duration_conv(track['duration']), "inline": True, }, { "name": "Filetype", "value": track['file_path'].rsplit(".", maxsplit=1)[1], "inline": True, }, { "name": "Higher Res", "value": "[stream/icecast](https://relay.sfm.codey.lol/aces.ogg) || [web player](https://codey.lol/radio)", "inline": True, } ] }] } sfm_hook = self.webhooks['sfm'].get('hook') async with ClientSession() as session: async with await session.post(sfm_hook, json=hook_data, timeout=ClientTimeout(connect=5, sock_read=5), headers={ 'content-type': 'application/json; charset=utf-8',}) as request: request.raise_for_status() # Next, AI feedback ai_response = await self.get_ai_song_info(track['artist'], track['song']) hook_data = { 'username': 'GPT', "embeds": [{ "title": "AI Feedback", "color": 0x35d0ff, "description": ai_response.strip(), }] } ai_hook = self.webhooks['gpt'].get('hook') async with ClientSession() as session: async with await session.post(ai_hook, json=hook_data, timeout=ClientTimeout(connect=5, sock_read=5), headers={ 'content-type': 'application/json; charset=utf-8',}) as request: request.raise_for_status() except Exception as e: traceback.print_exc()