import os import logging import asyncio import aiosqlite as sqlite3 csv_file_path: str = "artist_genre_imp.csv" # current directory db_file_path: str = os.path.join( "/usr/local/share", "sqlite_dbs", "artist_genre_map.db" ) track_db_file_path: str = os.path.join( "/usr/local/share", "sqlite_dbs", "track_file_map.db" ) artist_genre: dict[str, str] = {} logger = logging.getLogger() logger.setLevel(logging.DEBUG) async def process_csv(file_path: str) -> None: """ Load the CSV containing artist/genre pairs to memory, then to SQLite Format: Artist,Genre Allows for commas in artist name by splitting at final comma """ logging.info("Loading %s", file_path) with open(file_path, "r", encoding="utf-8") as f: lines = f.readlines() logging.info("Read %s lines", len(lines)) for line in lines: split_line = line.strip().split(",") genre = "".join(split_line[-1]).strip() artist = ",".join(split_line[0:-1]).strip() if artist in artist_genre: continue # Already in dict, skip artist_genre[artist] = genre logging.info("Processing %s artist/genre pairs", len(artist_genre)) async with sqlite3.connect(db_file_path, timeout=5) as _db: for artist, genre in artist_genre.items(): res = await _db.execute_insert( "INSERT INTO artist_genre (artist, genre) VALUES(?, ?)", ( artist, genre, ), ) if not res: logging.debug("Failed to insert %s", artist) logging.debug("Inserted id# %s", res) logging.info("Committing") await _db.commit() async def process_from_legacy(legacy_db_path: str) -> None: """ Load existing tagged artists from the db (track_file_map) For migration to new db """ logging.info("SQLite: Connecting to %s and %s", db_file_path, legacy_db_path) async with sqlite3.connect(db_file_path, timeout=5) as new_db: logging.info("Connected to new db") async with sqlite3.connect(track_db_file_path, timeout=30) as legacy_db: logging.info("Connected to legacy db") legacy_db.row_factory = sqlite3.Row async with await legacy_db.execute( "SELECT distinct(artist), genre FROM tracks WHERE genre != 'Untagged'" ) as _cursor: result = await _cursor.fetchall() logging.info("Read %s items from legacy db", len(result)) for result in result: artist = result["artist"].strip() genre = result["genre"].strip() res = await new_db.execute_insert( "INSERT OR IGNORE INTO artist_genre (artist, genre) VALUES(?, ?)", (artist, genre), ) logging.info("Inserted id# %s", res) logging.info("Committing to new_db") await new_db.commit() def __init__(): loop = asyncio.new_event_loop() # loop.create_task(process_csv(csv_file_path)) loop.create_task(process_from_legacy(track_db_file_path)) loop.run_forever() if __name__ == "__main__": __init__()