2025-02-10 20:29:57 -05:00
import logging
import traceback
import time
2025-02-11 20:01:07 -05:00
import regex
2025-02-16 06:53:41 -05:00
from regex import Pattern
2025-02-10 20:29:57 -05:00
import datetime
2025-02-11 20:01:07 -05:00
import os
2025-02-10 20:29:57 -05:00
import gpt
2025-02-11 11:26:20 -05:00
from aiohttp import ClientSession , ClientTimeout
2025-02-11 20:01:07 -05:00
import aiosqlite as sqlite3
2025-02-16 13:54:28 -05:00
from typing import Union , Optional , LiteralString , Iterable
2025-02-11 20:01:07 -05:00
from uuid import uuid4 as uuid
2025-03-04 08:11:55 -05:00
from endpoints . constructors import RadioException
2025-02-11 20:01:07 -05:00
2025-02-16 06:53:41 -05:00
double_space : Pattern = regex . compile ( r ' \ s { 2,} ' )
2025-02-10 20:29:57 -05:00
class RadioUtil :
2025-02-15 21:09:33 -05:00
"""
Radio Utils
"""
2025-02-10 20:29:57 -05:00
def __init__ ( self , constants ) - > None :
self . constants = constants
self . gpt = gpt . GPT ( self . constants )
2025-02-26 20:47:29 -05:00
self . ls_uri : str = self . constants . LS_URI
2025-03-14 13:45:49 -04:00
self . sqlite_exts : list [ str ] = [ ' /home/api/api/solibs/spellfix1.cpython-311-x86_64-linux-gnu.so ' ]
2025-02-15 21:09:33 -05:00
self . active_playlist_path : Union [ str , LiteralString ] = os . path \
. join ( " /usr/local/share " ,
" sqlite_dbs " , " track_file_map.db " )
2025-02-11 20:01:07 -05:00
self . active_playlist_name = " default " # not used
2025-02-14 16:07:24 -05:00
self . active_playlist : list [ dict ] = [ ]
2025-02-11 20:01:07 -05:00
self . now_playing : dict = {
' artist ' : ' N/A ' ,
' song ' : ' N/A ' ,
2025-03-04 08:11:55 -05:00
' album ' : ' N/A ' ,
2025-02-11 20:01:07 -05:00
' genre ' : ' N/A ' ,
' artistsong ' : ' N/A - N/A ' ,
' duration ' : 0 ,
' start ' : 0 ,
' end ' : 0 ,
' file_path ' : None ,
' id ' : None ,
}
self . webhooks : dict = {
2025-02-10 20:29:57 -05:00
' gpt ' : {
' hook ' : self . constants . GPT_WEBHOOK ,
} ,
' sfm ' : {
' hook ' : self . constants . SFM_WEBHOOK ,
}
}
2025-02-11 20:01:07 -05:00
2025-02-15 21:09:33 -05:00
def duration_conv ( self ,
s : Union [ int , float ] ) - > str :
2025-02-10 20:29:57 -05:00
"""
Convert duration given in seconds to hours , minutes , and seconds ( h : m : s )
Args :
2025-02-15 21:09:33 -05:00
s ( Union [ int , float ] ) : seconds to convert
2025-02-10 20:29:57 -05:00
Returns :
str
"""
2025-02-16 20:57:13 -05:00
return str ( datetime . timedelta ( seconds = s ) ) \
. split ( " . " , maxsplit = 1 ) [ 0 ]
2025-02-11 20:01:07 -05:00
2025-02-16 13:54:28 -05:00
async def trackdb_typeahead ( self , query : str ) - > Optional [ list [ str ] ] :
if not query :
return None
async with sqlite3 . connect ( self . active_playlist_path ,
timeout = 1 ) as _db :
_db . row_factory = sqlite3 . Row
2025-02-18 06:55:47 -05:00
db_query : str = """ SELECT DISTINCT(LOWER(TRIM(artist) || " - " || TRIM(song))), \
( TRIM ( artist ) | | " - " | | TRIM ( song ) ) as artistsong FROM tracks WHERE \
artistsong LIKE ? LIMIT 30 """
2025-02-16 13:54:28 -05:00
db_params : tuple [ str ] = ( f " % { query } % " , )
async with _db . execute ( db_query , db_params ) as _cursor :
result : Iterable [ sqlite3 . Row ] = await _cursor . fetchall ( )
out_result = [
str ( r [ ' artistsong ' ] ) for r in result
]
return out_result
2025-02-15 21:09:33 -05:00
async def search_playlist ( self , artistsong : Optional [ str ] = None ,
artist : Optional [ str ] = None ,
2025-02-12 07:53:22 -05:00
song : Optional [ str ] = None ) - > bool :
"""
Search for track , add it up next in play queue if found
Args :
artistsong ( Optional [ str ] ) : Artist - Song combo to search [ ignored if artist / song are specified ]
artist ( Optional [ str ] ) : Artist to search ( ignored if artistsong is specified )
song ( Optional [ str ] ) : Song to search ( ignored if artistsong is specified )
Returns :
bool
"""
2025-02-11 20:01:07 -05:00
if artistsong and ( artist or song ) :
raise RadioException ( " Cannot search using combination provided " )
if not artistsong and ( not artist or not song ) :
raise RadioException ( " No query provided " )
try :
2025-03-04 08:11:55 -05:00
search_query : str = ' SELECT id, artist, song, (artist || " - " || song) AS artistsong, album, genre, file_path, duration FROM tracks \
2025-02-11 20:01:07 -05:00
WHERE editdist3 ( ( lower ( artist ) | | " " | | lower ( song ) ) , ( ? | | " " | | ? ) ) \
< = 410 ORDER BY editdist3 ( ( lower ( artist ) | | " " | | lower ( song ) ) , ? ) ASC LIMIT 1 '
if artistsong :
artistsong_split : list = artistsong . split ( " - " , maxsplit = 1 )
( search_artist , search_song ) = tuple ( artistsong_split )
else :
2025-02-14 16:07:24 -05:00
search_artist = artist
search_song = song
2025-02-11 20:01:07 -05:00
if not artistsong :
2025-02-14 16:07:24 -05:00
artistsong = f " { search_artist } - { search_song } "
2025-02-11 20:01:07 -05:00
search_params = ( search_artist . lower ( ) , search_song . lower ( ) , artistsong . lower ( ) , )
async with sqlite3 . connect ( self . active_playlist_path ,
timeout = 2 ) as db_conn :
await db_conn . enable_load_extension ( True )
for ext in self . sqlite_exts :
await db_conn . load_extension ( ext )
db_conn . row_factory = sqlite3 . Row
async with await db_conn . execute ( search_query , search_params ) as db_cursor :
result : Optional [ sqlite3 . Row | bool ] = await db_cursor . fetchone ( )
2025-02-14 16:07:24 -05:00
if not result or not isinstance ( result , sqlite3 . Row ) :
2025-02-11 20:01:07 -05:00
return False
pushObj : dict = {
' id ' : result [ ' id ' ] ,
' uuid ' : str ( uuid ( ) . hex ) ,
' artist ' : result [ ' artist ' ] . strip ( ) ,
' song ' : result [ ' song ' ] . strip ( ) ,
' artistsong ' : result [ ' artistsong ' ] . strip ( ) ,
' genre ' : result [ ' genre ' ] ,
' file_path ' : result [ ' file_path ' ] ,
' duration ' : result [ ' duration ' ] ,
}
self . active_playlist . insert ( 0 , pushObj )
return True
except Exception as e :
logging . critical ( " search_playlist:: Search error occurred: %s " , str ( e ) )
traceback . print_exc ( )
return False
2025-02-14 16:07:24 -05:00
async def load_playlist ( self ) - > None :
2025-02-12 07:53:22 -05:00
""" Load Playlist """
2025-02-11 20:01:07 -05:00
try :
logging . info ( f " Loading playlist... " )
self . active_playlist . clear ( )
2025-03-04 08:11:55 -05:00
# db_query = 'SELECT distinct(artist || " - " || song) AS artistdashsong, id, artist, song, album, genre, file_path, duration FROM tracks\
2025-02-11 20:01:07 -05:00
# GROUP BY artistdashsong ORDER BY RANDOM()'
"""
LIMITED GENRES
"""
2025-03-14 13:45:49 -04:00
db_query : str = """ SELECT distinct(LOWER(TRIM(artist)) || " - " || LOWER(TRIM(song))), (TRIM(artist) || " - " || TRIM(song)) AS artistdashsong, id, artist, song, album, genre, file_path, duration FROM tracks \
2025-04-07 11:08:07 -04:00
WHERE ( genre LIKE " % metalcore % " \
2025-03-14 13:45:49 -04:00
OR genre LIKE " %r ock % " \
OR genre LIKE " % pop punk % " \
OR genre LIKE " % math rock % " \
OR genre LIKE " % punk rock % " \
OR genre LIKE " % metal % " \
OR genre LIKE " % punk % " \
OR genre LIKE " %e lectronic % " \
OR genre LIKE " % nu metal % " \
OR genre LIKE " %E DM % " \
OR genre LIKE " % post-hardcore % " \
OR genre LIKE " % pop rock % " \
OR genre LIKE " %e xperimental % " \
OR genre LIKE " % post-punk % " \
OR genre LIKE " %d eath metal % " \
OR genre LIKE " %e lectronicore % " \
OR genre LIKE " %ha rd rock % " \
OR genre LIKE " % psychedelic rock % " \
OR genre LIKE " %g runge % " \
OR genre LIKE " %ho use % " \
OR genre LIKE " %d ubstep % " \
OR genre LIKE " %ha rdcore % " \
OR genre LIKE " %ha ir metal % " \
OR genre LIKE " %ho rror punk % " \
OR genre LIKE " %f olk punk % " \
OR genre LIKE " % breakcore % " \
OR genre LIKE " % post-rock % " \
OR genre LIKE " %d eathcore % " \
OR genre LIKE " %ha rdcore punk % " \
OR genre LIKE " %s ynthwave % " \
OR genre LIKE " % trap % " \
OR genre LIKE " %i ndie pop % " \
OR genre LIKE " %d nb % " ) \
GROUP BY artistdashsong ORDER BY RANDOM ( ) """
2025-03-04 08:11:55 -05:00
"""
LIMITED TO ONE / SMALL SUBSET OF GENRES
"""
2025-04-08 20:15:32 -04:00
# db_query = 'SELECT distinct(artist || " - " || song) AS artistdashsong, id, artist, song, album, genre, file_path, duration FROM tracks\
2025-04-10 19:38:43 -04:00
# WHERE (artist LIKE "%bad omens%") AND (NOT song LIKE "%(live%") ORDER BY artist DESC, album ASC, song ASC'
2025-02-11 20:01:07 -05:00
2025-02-14 16:07:24 -05:00
"""
2025-03-04 08:11:55 -05:00
LIMITED TO ONE / SOME ARTISTS . . .
2025-02-14 16:07:24 -05:00
"""
2025-04-08 20:15:32 -04:00
db_query = ' SELECT distinct(artist || " - " || song) AS artistdashsong, id, artist, song, album, genre, file_path, duration FROM tracks \
2025-04-10 19:38:43 -04:00
WHERE ( artist LIKE " % we butter % " OR artist LIKE " %e isbrecher % " OR artist LIKE " % black ang % " OR artist LIKE " % madison affair % " ) AND ( NOT song LIKE " %% stripped %% " AND NOT song LIKE " %(2022)% " AND NOT song LIKE " % (live %% " AND NOT song LIKE " %% acoustic %% " AND NOT song LIKE " %% instrumental %% " AND NOT song LIKE " %% remix %% " AND NOT song LIKE " %% reimagined %% " AND NOT song LIKE " %% alternative %% " AND NOT song LIKE " %% unzipped %% " ) GROUP BY artistdashsong ORDER BY RANDOM ( ) ' # ORDER BY album ASC, id ASC '
2025-02-14 16:07:24 -05:00
2025-02-11 20:01:07 -05:00
async with sqlite3 . connect ( self . active_playlist_path ,
timeout = 2 ) as db_conn :
db_conn . row_factory = sqlite3 . Row
async with await db_conn . execute ( db_query ) as db_cursor :
2025-02-14 16:07:24 -05:00
results : list [ sqlite3 . Row ] = await db_cursor . fetchall ( )
self . active_playlist = [ {
2025-02-11 20:01:07 -05:00
' uuid ' : str ( uuid ( ) . hex ) ,
' id ' : r [ ' id ' ] ,
' artist ' : double_space . sub ( ' ' , r [ ' artist ' ] ) . strip ( ) ,
' song ' : double_space . sub ( ' ' , r [ ' song ' ] ) . strip ( ) ,
2025-03-04 08:11:55 -05:00
' album ' : double_space . sub ( ' ' , r [ ' album ' ] ) . strip ( ) ,
2025-02-11 20:01:07 -05:00
' genre ' : r [ ' genre ' ] if r [ ' genre ' ] else ' Unknown ' ,
' artistsong ' : double_space . sub ( ' ' , r [ ' artistdashsong ' ] ) . strip ( ) ,
' file_path ' : r [ ' file_path ' ] ,
' duration ' : r [ ' duration ' ] ,
} for r in results ]
logging . info ( " Populated active playlists with %s items " ,
len ( self . active_playlist ) )
except :
traceback . print_exc ( )
2025-02-15 21:09:33 -05:00
async def cache_album_art ( self , track_id : int ,
album_art : bytes ) - > None :
2025-02-12 07:53:22 -05:00
"""
Cache Album Art to SQLite DB
Args :
track_id ( int ) : Track ID to update
album_art ( bytes ) : Album art data
Returns :
None
"""
2025-02-11 20:01:07 -05:00
try :
async with sqlite3 . connect ( self . active_playlist_path ,
timeout = 2 ) as db_conn :
async with await db_conn . execute ( " UPDATE tracks SET album_art = ? WHERE id = ? " ,
( album_art , track_id , ) ) as db_cursor :
await db_conn . commit ( )
except :
traceback . print_exc ( )
async def get_album_art ( self , track_id : Optional [ int ] = None ,
2025-02-14 16:07:24 -05:00
file_path : Optional [ str ] = None ) - > Optional [ bytes ] :
2025-02-12 07:53:22 -05:00
"""
Get Album Art
Args :
track_id ( Optional [ int ] ) : Track ID to query ( ignored if file_path is specified )
file_path ( Optional [ str ] ) : file_path to query ( ignored if track_id is specified )
Returns :
bytes
"""
2025-02-11 20:01:07 -05:00
try :
async with sqlite3 . connect ( self . active_playlist_path ,
timeout = 2 ) as db_conn :
db_conn . row_factory = sqlite3 . Row
query : str = " SELECT album_art FROM tracks WHERE id = ? "
query_params : tuple = ( track_id , )
if file_path and not track_id :
2025-02-14 16:07:24 -05:00
query = " SELECT album_art FROM tracks WHERE file_path = ? "
query_params = ( file_path , )
2025-02-11 20:01:07 -05:00
async with await db_conn . execute ( query ,
query_params ) as db_cursor :
2025-02-15 21:09:33 -05:00
result : Optional [ Union [ sqlite3 . Row , bool ] ] = await db_cursor . fetchone ( )
2025-02-14 16:07:24 -05:00
if not result or not isinstance ( result , sqlite3 . Row ) :
return None
2025-02-11 20:01:07 -05:00
return result [ ' album_art ' ]
except :
traceback . print_exc ( )
2025-02-14 16:07:24 -05:00
return None
2025-02-11 20:01:07 -05:00
2025-02-15 21:09:33 -05:00
def get_queue_item_by_uuid ( self ,
uuid : str ) - > Optional [ tuple [ int , dict ] ] :
2025-02-11 20:01:07 -05:00
"""
Get queue item by UUID
Args :
uuid : The UUID to search
Returns :
2025-02-15 21:09:33 -05:00
Optional [ tuple [ int , dict ] ]
2025-02-11 20:01:07 -05:00
"""
for x , item in enumerate ( self . active_playlist ) :
if item . get ( ' uuid ' ) == uuid :
return ( x , item )
return None
async def _ls_skip ( self ) - > bool :
2025-02-12 07:53:22 -05:00
"""
Ask LiquidSoap server to skip to the next track
Args :
None
Returns :
bool
"""
2025-02-11 20:01:07 -05:00
try :
async with ClientSession ( ) as session :
async with session . get ( f " { self . ls_uri } /next " ,
timeout = ClientTimeout ( connect = 2 , sock_read = 2 ) ) as request :
request . raise_for_status ( )
text : Optional [ str ] = await request . text ( )
return text == " OK "
except Exception as e :
logging . debug ( " Skip failed: %s " , str ( e ) )
return False # failsafe
2025-02-10 20:29:57 -05:00
2025-02-15 21:09:33 -05:00
async def get_ai_song_info ( self , artist : str ,
song : str ) - > Optional [ str ] :
2025-02-10 20:29:57 -05:00
"""
Get AI Song Info
Args :
artist ( str )
song ( str )
Returns :
2025-02-15 21:09:33 -05:00
Optional [ str ]
2025-02-10 20:29:57 -05:00
"""
2025-02-15 21:09:33 -05:00
prompt : str = f " am going to listen to { song } by { artist } . "
response : Optional [ str ] = await self . gpt . get_completion ( prompt )
2025-02-10 20:29:57 -05:00
if not response :
logging . critical ( " No response received from GPT? " )
2025-02-14 16:07:24 -05:00
return None
2025-02-10 20:29:57 -05:00
return response
async def webhook_song_change ( self , track : dict ) - > None :
try :
"""
Handles Song Change Outbounds ( Webhooks )
Args :
track ( dict )
Returns :
None
"""
2025-02-11 20:01:07 -05:00
2025-02-10 20:29:57 -05:00
# First, send track info
2025-02-11 20:01:07 -05:00
friendly_track_start : str = time . strftime ( ' % Y- % m- %d % H: % M: % S ' , time . localtime ( track [ ' start ' ] ) )
friendly_track_end : str = time . strftime ( ' % Y- % m- %d % H: % M: % S ' , time . localtime ( track [ ' end ' ] ) )
hook_data : dict = {
2025-02-10 20:29:57 -05:00
' username ' : ' serious.FM ' ,
" embeds " : [ {
" title " : " Now Playing " ,
2025-02-11 15:21:01 -05:00
" description " : f ' ## { track [ ' song ' ] } \n by \n ## { track [ ' artist ' ] } ' ,
2025-02-10 20:29:57 -05:00
" color " : 0x30c56f ,
2025-02-11 15:21:01 -05:00
" thumbnail " : {
2025-02-18 06:55:47 -05:00
" url " : f " https://api.codey.lol/radio/album_art?track_id= { track [ ' id ' ] } & { int ( time . time ( ) ) } " ,
2025-02-11 15:21:01 -05:00
} ,
2025-02-10 20:29:57 -05:00
" fields " : [
{
" name " : " Duration " ,
" value " : self . duration_conv ( track [ ' duration ' ] ) ,
" inline " : True ,
} ,
{
2025-02-11 16:44:53 -05:00
" name " : " Genre " ,
" value " : track [ ' genre ' ] if track [ ' genre ' ] else ' Unknown ' ,
" inline " : True ,
} ,
{
2025-02-10 20:29:57 -05:00
" name " : " Filetype " ,
" value " : track [ ' file_path ' ] . rsplit ( " . " , maxsplit = 1 ) [ 1 ] ,
" inline " : True ,
} ,
{
" name " : " Higher Res " ,
2025-03-10 10:00:23 -04:00
" value " : " [stream/icecast](https://stream.codey.lol/sfm.ogg) | [web player](https://codey.lol/radio) " ,
2025-02-10 20:29:57 -05:00
" inline " : True ,
2025-03-10 10:00:23 -04:00
} ,
{
" name " : " Album " ,
" value " : track [ ' album ' ] if track [ ' album ' ] else " Unknown " ,
} ,
2025-02-10 20:29:57 -05:00
]
} ]
}
2025-02-11 20:01:07 -05:00
sfm_hook : str = self . webhooks [ ' sfm ' ] . get ( ' hook ' )
2025-02-10 20:29:57 -05:00
async with ClientSession ( ) as session :
async with await session . post ( sfm_hook , json = hook_data ,
timeout = ClientTimeout ( connect = 5 , sock_read = 5 ) , headers = {
' content-type ' : ' application/json; charset=utf-8 ' , } ) as request :
request . raise_for_status ( )
# Next, AI feedback
2025-02-11 20:01:07 -05:00
ai_response : Optional [ str ] = await self . get_ai_song_info ( track [ ' artist ' ] ,
2025-02-10 20:29:57 -05:00
track [ ' song ' ] )
2025-02-11 20:01:07 -05:00
if not ai_response :
return
2025-02-14 16:07:24 -05:00
hook_data = {
2025-02-10 20:29:57 -05:00
' username ' : ' GPT ' ,
" embeds " : [ {
" title " : " AI Feedback " ,
" color " : 0x35d0ff ,
" description " : ai_response . strip ( ) ,
} ]
}
2025-02-11 20:01:07 -05:00
ai_hook : str = self . webhooks [ ' gpt ' ] . get ( ' hook ' )
2025-02-10 20:29:57 -05:00
async with ClientSession ( ) as session :
async with await session . post ( ai_hook , json = hook_data ,
timeout = ClientTimeout ( connect = 5 , sock_read = 5 ) , headers = {
' content-type ' : ' application/json; charset=utf-8 ' , } ) as request :
request . raise_for_status ( )
except Exception as e :
traceback . print_exc ( )