243 lines
		
	
	
		
			10 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			243 lines
		
	
	
		
			10 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| #!/usr/bin/env python3.12
 | |
| 
 | |
| import logging
 | |
| import traceback
 | |
| import os
 | |
| import aiosqlite as sqlite3
 | |
| import time
 | |
| import random
 | |
| import asyncio
 | |
| import regex
 | |
| import music_tag
 | |
| from . import radio_util
 | |
| from .constructors import ValidRadioNextRequest, ValidRadioReshuffleRequest, ValidRadioQueueShiftRequest,\
 | |
|     ValidRadioQueueRemovalRequest, ValidRadioSongRequest,\
 | |
|         ValidRadioQueueGetRequest, RadioException
 | |
| from uuid import uuid4 as uuid
 | |
| from typing import Optional, LiteralString
 | |
| from fastapi import FastAPI, BackgroundTasks, Request, Response, HTTPException
 | |
| from fastapi.responses import RedirectResponse
 | |
| from aiohttp import ClientSession, ClientTimeout
 | |
| # pylint: disable=bare-except, broad-exception-caught, invalid-name
 | |
| 
 | |
| 
 | |
| """
 | |
| TODO: 
 | |
|     - Radio request typeahead
 | |
| """
 | |
|     
 | |
| class Radio(FastAPI):
 | |
|     """Radio Endpoints"""
 | |
|     def __init__(self, app: FastAPI, my_util, constants, glob_state) -> None:  # pylint: disable=super-init-not-called
 | |
|         self.app = app
 | |
|         self.util = my_util
 | |
|         self.constants = constants
 | |
|         self.radio_util = radio_util.RadioUtil(self.constants)
 | |
|         self.glob_state = glob_state
 | |
| 
 | |
|         self.endpoints: dict = {
 | |
|             "radio/np": self.radio_now_playing,
 | |
|             "radio/request": self.radio_request,
 | |
|             "radio/get_queue": self.radio_get_queue,
 | |
|             "radio/skip": self.radio_skip,
 | |
|             "radio/queue_shift": self.radio_queue_shift,
 | |
|             "radio/reshuffle": self.radio_reshuffle,
 | |
|             "radio/queue_remove": self.radio_queue_remove,
 | |
|             "radio/ls._next_": self.radio_get_next,        
 | |
|         }
 | |
|         
 | |
|         for endpoint, handler in self.endpoints.items():
 | |
|             app.add_api_route(f"/{endpoint}", handler, methods=["POST"],
 | |
|                               include_in_schema=False) # change include_in_schema to False
 | |
|         
 | |
|         # NOTE: Not in loop because method is GET for this endpoint            
 | |
|         app.add_api_route("/radio/album_art", self.album_art_handler, methods=["GET"],
 | |
|                           include_in_schema=True)    
 | |
|         asyncio.get_event_loop().run_until_complete(self.radio_util.load_playlist())
 | |
|         asyncio.get_event_loop().run_until_complete(self.radio_util._ls_skip())          
 | |
|     
 | |
|     async def radio_skip(self, data: ValidRadioNextRequest, request: Request) -> bool:
 | |
|         """
 | |
|         Skip to the next track in the queue, or to uuid specified in skipTo if provided
 | |
|         """
 | |
|         try:
 | |
|             if not self.util.check_key(path=request.url.path, req_type=4, key=data.key):
 | |
|                 raise HTTPException(status_code=403, detail="Unauthorized")
 | |
|             if data.skipTo:
 | |
|                 (x, _) = self.radio_util.get_queue_item_by_uuid(data.skipTo)
 | |
|                 self.radio_util.active_playlist = self.radio_util.active_playlist[x:]
 | |
|                 if not self.radio_util.active_playlist:
 | |
|                     await self.radio_util.load_playlist()
 | |
|             return await self.radio_util._ls_skip()
 | |
|         except Exception as e:
 | |
|             traceback.print_exc()
 | |
|             return False
 | |
|                 
 | |
|     
 | |
|     async def radio_reshuffle(self, data: ValidRadioReshuffleRequest, request: Request) -> dict:
 | |
|         """
 | |
|         Reshuffle the play queue
 | |
|         """
 | |
|         if not self.util.check_key(path=request.url.path, req_type=4, key=data.key):
 | |
|             raise HTTPException(status_code=403, detail="Unauthorized")
 | |
|         
 | |
|         random.shuffle(self.radio_util.active_playlist)
 | |
|         return {
 | |
|             'ok': True
 | |
|         }
 | |
|             
 | |
|     
 | |
|     async def radio_get_queue(self, request: Request, limit: Optional[int] = 15_000) -> dict:
 | |
|         """
 | |
|         Get current play queue, up to limit [default: 15k]
 | |
|         """
 | |
|         
 | |
|         queue_out: list[dict] = []
 | |
|         for x, item in enumerate(self.radio_util.active_playlist[0:limit]):
 | |
|             queue_out.append({
 | |
|                 'pos': x,
 | |
|                 'id': item.get('id'),
 | |
|                 'uuid': item.get('uuid'),
 | |
|                 'artist': item.get('artist'),
 | |
|                 'song': item.get('song'),
 | |
|                 'artistsong': item.get('artistsong'),
 | |
|                 'duration': item.get('duration'),
 | |
|             })
 | |
|         return {
 | |
|             'items': queue_out
 | |
|         }
 | |
|         
 | |
|     async def radio_queue_shift(self, data: ValidRadioQueueShiftRequest, request: Request) -> dict:
 | |
|         """Shift position of a UUID within the queue [currently limited to playing next or immediately]"""
 | |
|         if not self.util.check_key(path=request.url.path, req_type=4, key=data.key):
 | |
|             raise HTTPException(status_code=403, detail="Unauthorized")
 | |
|         
 | |
|         (x, item) = self.radio_util.get_queue_item_by_uuid(data.uuid)
 | |
|         self.radio_util.active_playlist.pop(x)        
 | |
|         self.radio_util.active_playlist.insert(0, item)
 | |
|         if not data.next:
 | |
|             await self.radio_util._ls_skip()
 | |
|         return {
 | |
|             'ok': True,
 | |
|         }
 | |
|         
 | |
|     async def radio_queue_remove(self, data: ValidRadioQueueRemovalRequest, request: Request) -> dict:
 | |
|         """Remove an item from the current play queue"""
 | |
|         if not self.util.check_key(path=request.url.path, req_type=4, key=data.key):
 | |
|             raise HTTPException(status_code=403, detail="Unauthorized")
 | |
|         
 | |
|         (x, found_item) = self.radio_util.get_queue_item_by_uuid(data.uuid)
 | |
|         if not found_item:
 | |
|             return {
 | |
|                 'ok': False,
 | |
|                 'err': 'UUID not found in play queue',
 | |
|             }
 | |
|         self.radio_util.active_playlist.pop(x)
 | |
|         return {
 | |
|             'ok': True,
 | |
|         }
 | |
|         
 | |
|     async def album_art_handler(self, request: Request, track_id: Optional[int] = None) -> bytes:
 | |
|         """
 | |
|         Get album art, optional parameter track_id may be specified.
 | |
|         Otherwise, current track album art will be pulled.
 | |
|         """
 | |
|         try:
 | |
|             if not track_id:
 | |
|                 track_id = self.radio_util.now_playing.get('id')
 | |
|             logging.debug("Seeking album art with trackId: %s", track_id)
 | |
|             album_art: Optional[bytes] = await self.radio_util.get_album_art(track_id=track_id)
 | |
|             if not album_art:
 | |
|                 return RedirectResponse(url="https://codey.lol/images/radio_art_default.jpg",
 | |
|                                         status_code=302)                
 | |
|             return Response(content=album_art,
 | |
|                             media_type="image/png")
 | |
|         except Exception as e:
 | |
|             traceback.print_exc()
 | |
|             return RedirectResponse(url="https://codey.lol/images/radio_art_default.jpg",
 | |
|                                         status_code=302)       
 | |
|             
 | |
|     async def radio_now_playing(self, request: Request) -> dict:
 | |
|         """Get currently playing track info"""
 | |
|         ret_obj: dict = {**self.radio_util.now_playing}
 | |
|         cur_elapsed: int = self.radio_util.now_playing.get('elapsed', -1)
 | |
|         cur_duration: int = self.radio_util.now_playing.get('duration', 999999)
 | |
|         try:
 | |
|             ret_obj['elapsed'] = int(time.time()) - ret_obj['start']
 | |
|         except KeyError:
 | |
|             traceback.print_exc()
 | |
|             ret_obj['elapsed'] = 0
 | |
|         ret_obj.pop('file_path')
 | |
|         return ret_obj
 | |
|     
 | |
|     
 | |
|     async def radio_get_next(self, data: ValidRadioNextRequest, request: Request,
 | |
|                              background_tasks: BackgroundTasks) -> Optional[dict]:
 | |
|         """
 | |
|         Get next track
 | |
|         Track will be removed from the queue in the process.
 | |
|         """
 | |
|         if not self.util.check_key(path=request.url.path, req_type=4, key=data.key):
 | |
|                 raise HTTPException(status_code=403, detail="Unauthorized")
 | |
|         if not isinstance(self.radio_util.active_playlist, list) or not self.radio_util.active_playlist:
 | |
|             await self.radio_util.load_playlist()
 | |
|             await self.radio_util._ls_skip()
 | |
|             return
 | |
|         next = self.radio_util.active_playlist.pop(0)
 | |
|         if not isinstance(next, dict):
 | |
|             logging.critical("next is of type: %s, reloading playlist...", type(next))
 | |
|             await self.radio_util.load_playlist()
 | |
|             await self.radio_util._ls_skip()
 | |
|             return
 | |
|         
 | |
|         duration: int = next['duration']
 | |
|         time_started: int = int(time.time())
 | |
|         time_ends: int = int(time_started + duration) 
 | |
|         
 | |
|         if len(self.radio_util.active_playlist) > 1:
 | |
|             self.radio_util.active_playlist.append(next) # Push to end of playlist
 | |
|         else:
 | |
|             await self.radio_util.load_playlist()
 | |
|             
 | |
|         self.radio_util.now_playing = next
 | |
|         next['start'] = time_started
 | |
|         next['end'] = time_ends
 | |
|         try:
 | |
|             background_tasks.add_task(self.radio_util.webhook_song_change, next)
 | |
|         except Exception as e:
 | |
|             traceback.print_exc()
 | |
|         try:
 | |
|             if not await self.radio_util.get_album_art(file_path=next['file_path']):
 | |
|                 album_art = await self.radio_util.get_album_art(file_path=next['file_path'])
 | |
|                 await self.radio_util.cache_album_art(next['id'], album_art)
 | |
|         except:
 | |
|             traceback.print_exc()
 | |
|         return next
 | |
| 
 | |
|         
 | |
|     async def radio_request(self, data: ValidRadioSongRequest, request: Request) -> Response:
 | |
|         """Song request handler"""
 | |
|         if not self.util.check_key(path=request.url.path, req_type=4, key=data.key):
 | |
|                 raise HTTPException(status_code=403, detail="Unauthorized")
 | |
|         artistsong: str = data.artistsong
 | |
|         artist: str = data.artist
 | |
|         song: str = data.song
 | |
|         if artistsong and (artist or song):
 | |
|             return {
 | |
|                 'err': True,
 | |
|                 'errorText': 'Invalid request',
 | |
|             }
 | |
|         if not artistsong and (not artist or not song):
 | |
|             return {
 | |
|                 'err': True,
 | |
|                 'errorText': 'Invalid request',
 | |
|             }       
 | |
|             
 | |
|         search: bool = await self.radio_util.search_playlist(artistsong=artistsong,
 | |
|                                             artist=artist,
 | |
|                                             song=song)
 | |
|         if data.alsoSkip:
 | |
|             await self.radio_util._ls_skip()
 | |
|         return {
 | |
|             'result': search
 | |
|             }      |