#!/usr/bin/env python3 """ SQLite -> PostgreSQL migrator. This script integrates the download/check/notification helpers from `update_lrclib_db.py` and migrates a given SQLite dump into a new **staging** PostgreSQL database named with the dump date. After a successful import the staging DB is swapped to replace the production DB (with a backup rename of the previous DB). Usage examples: - Default (auto-fetch + notify): ./migrate_sqlite_to_pg.py - Migrate a local file: ./migrate_sqlite_to_pg.py --sqlite /path/to/db.sqlite3 - Disable notifications: ./migrate_sqlite_to_pg.py --no-notify - Force re-import: ./migrate_sqlite_to_pg.py --force """ from __future__ import annotations import argparse import asyncio import io import os import sqlite3 import sys from datetime import datetime, timezone from pathlib import Path from typing import Any, Optional import psycopg2 # type: ignore[import] from dotenv import load_dotenv # Import helpers from update_lrclib_db.py (async functions) from update_lrclib_db import ( fetch_latest_dump_info, download_and_extract_dump, parse_dump_date, notify_start, notify_new_dump_found, notify_success, notify_failure, load_state, save_state, UpsertReport, TableReport, ) load_dotenv() # ---------------------- Config ---------------------- PG_HOST = os.getenv("POSTGRES_HOST", "localhost") PG_PORT = int(os.getenv("POSTGRES_PORT", "5432")) PG_USER = os.getenv("POSTGRES_USER", "api") PG_PASSWORD = os.getenv("POSTGRES_PASSWORD", "") PG_DATABASE = os.getenv("POSTGRES_DB", "lrclib") CHUNK_SIZE = int(os.getenv("MIGRATE_CHUNK_SIZE", "100000")) # 100k rows per batch DEFAULT_DEST_DIR = Path(os.getenv("LRCLIB_DUMP_DIR", "/nvme/tmp")) # ---------------------- SQLite -> PostgreSQL helpers ---------------------- def sqlite_to_pg_type(sqlite_type: str) -> str: """Convert SQLite type to PostgreSQL type.""" if not sqlite_type: return "TEXT" sqlite_type = sqlite_type.upper() mapping = { "INTEGER": "BIGINT", "TEXT": "TEXT", "REAL": "DOUBLE PRECISION", "FLOAT": "DOUBLE PRECISION", "BLOB": "BYTEA", "NUMERIC": "NUMERIC", "BOOLEAN": "BOOLEAN", "DATE": "DATE", "DATETIME": "TIMESTAMP", "TIMESTAMP": "TIMESTAMPTZ", } for key, pg_type in mapping.items(): if key in sqlite_type: return pg_type return "TEXT" def get_sqlite_tables(conn: sqlite3.Connection) -> list[str]: """Get list of user tables from SQLite database.""" cur = conn.cursor() cur.execute( "SELECT name FROM sqlite_master WHERE type='table' AND name NOT LIKE 'sqlite_%';" ) return [row[0] for row in cur.fetchall()] def get_table_schema(conn: sqlite3.Connection, table: str) -> list[tuple[str, str]]: """Get column names and types for a SQLite table.""" cur = conn.cursor() cur.execute(f"PRAGMA table_info({table});") return [(row[1], row[2]) for row in cur.fetchall()] def clean_row(row: tuple, columns: list[tuple[str, str]]) -> tuple: """Clean row data for PostgreSQL insertion.""" cleaned: list[Any] = [] for i, value in enumerate(row): col_name, col_type = columns[i] pg_type = sqlite_to_pg_type(col_type) if value is None: cleaned.append(None) elif pg_type == "BOOLEAN" and isinstance(value, int): cleaned.append(bool(value)) elif isinstance(value, str): # Remove NULL bytes which PostgreSQL text fields reject cleaned.append(value.replace("\x00", "")) else: cleaned.append(value) return tuple(cleaned) def escape_copy_value(value, pg_type: str) -> str: """Escape a value for PostgreSQL COPY format (tab-separated).\n This is much faster than INSERT for bulk loading. """ if value is None: return "\\N" # NULL marker for COPY if pg_type == "BOOLEAN" and isinstance(value, int): return "t" if value else "f" if isinstance(value, bool): return "t" if value else "f" if isinstance(value, (int, float)): return str(value) if isinstance(value, bytes): # BYTEA: encode as hex return "\\\\x" + value.hex() # String: escape special characters for COPY s = str(value) s = s.replace("\x00", "") # Remove NULL bytes s = s.replace("\\", "\\\\") # Escape backslashes s = s.replace("\t", "\\t") # Escape tabs s = s.replace("\n", "\\n") # Escape newlines s = s.replace("\r", "\\r") # Escape carriage returns return s def create_table( pg_conn, table: str, columns: list[tuple[str, str]], unlogged: bool = True ) -> None: """Create a table in PostgreSQL based on SQLite schema. Uses UNLOGGED tables by default for faster bulk import (no WAL writes). """ cur = pg_conn.cursor() # Check if table exists cur.execute( "SELECT EXISTS (SELECT FROM information_schema.tables WHERE table_name = %s);", (table,), ) if cur.fetchone()[0]: print(f" Table {table} already exists, skipping create") return col_defs = [f'"{name}" {sqlite_to_pg_type(coltype)}' for name, coltype in columns] unlogged_kw = "UNLOGGED" if unlogged else "" sql = f'CREATE {unlogged_kw} TABLE "{table}" ({", ".join(col_defs)});' print(f" Creating {'UNLOGGED ' if unlogged else ''}table: {table}") cur.execute(sql) pg_conn.commit() def migrate_table( sqlite_conn: sqlite3.Connection, pg_conn, table: str, columns: list[tuple[str, str]], ) -> TableReport: """Migrate data using PostgreSQL COPY for maximum speed. COPY is 5-10x faster than INSERT for bulk loading. """ import time start = time.time() report = TableReport(table_name=table) sqlite_cur = sqlite_conn.cursor() pg_cur = pg_conn.cursor() col_names = [c[0] for c in columns] col_names_quoted = ", ".join([f'"{c}"' for c in col_names]) pg_types = [sqlite_to_pg_type(c[1]) for c in columns] # Get row count sqlite_cur.execute(f'SELECT COUNT(*) FROM "{table}";') report.sqlite_rows = sqlite_cur.fetchone()[0] report.pg_rows_before = 0 # Fresh table if report.sqlite_rows == 0: print(f" Skipping {table}: empty table") report.pg_rows_after = 0 report.status = "success" report.duration_seconds = time.time() - start return report print(f" Importing {table}: {report.sqlite_rows:,} rows using COPY") offset = 0 total_copied = 0 errors = 0 while offset < report.sqlite_rows: try: sqlite_cur.execute( f'SELECT {col_names_quoted} FROM "{table}" LIMIT {CHUNK_SIZE} OFFSET {offset};' ) rows = sqlite_cur.fetchall() if not rows: break # Build COPY data in memory buffer = io.StringIO() for row in rows: line_parts = [ escape_copy_value(val, pg_types[i]) for i, val in enumerate(row) ] buffer.write("\t".join(line_parts) + "\n") buffer.seek(0) # Use COPY FROM for fast bulk insert pg_cur.copy_expert( f'COPY "{table}" ({col_names_quoted}) FROM STDIN WITH (FORMAT text)', buffer, ) pg_conn.commit() total_copied += len(rows) offset += CHUNK_SIZE pct = min(100.0, (offset / report.sqlite_rows) * 100) elapsed = time.time() - start rate = total_copied / elapsed if elapsed > 0 else 0 print( f" Progress: {min(offset, report.sqlite_rows):,}/{report.sqlite_rows:,} ({pct:.1f}%) - {rate:,.0f} rows/sec", end="\r", ) except Exception as e: print(f"\n ⚠ Error at offset {offset} for table {table}: {e}") errors += 1 report.errors += 1 offset += CHUNK_SIZE pg_conn.rollback() if errors > 10: report.status = "failed" report.error_message = f"Too many errors ({errors})" break print() # newline after progress # Final count pg_cur.execute(f'SELECT COUNT(*) FROM "{table}";') report.pg_rows_after = pg_cur.fetchone()[0] report.rows_affected = total_copied report.duration_seconds = time.time() - start if report.status != "failed": report.status = "success" rate = ( report.rows_affected / report.duration_seconds if report.duration_seconds > 0 else 0 ) print( f" ✓ Table {table}: {report.rows_affected:,} rows in {report.duration_seconds:.1f}s ({rate:,.0f} rows/sec)" ) return report # ---------------------- PostgreSQL DB management ---------------------- def pg_connect(dbname: Optional[str] = None): """Connect to PostgreSQL database.""" return psycopg2.connect( host=PG_HOST, port=PG_PORT, database=dbname or "postgres", user=PG_USER, password=PG_PASSWORD, ) def create_database(db_name: str) -> None: """Create a new PostgreSQL database.""" print(f"Creating database: {db_name}") conn = pg_connect("postgres") conn.autocommit = True cur = conn.cursor() cur.execute("SELECT 1 FROM pg_database WHERE datname = %s", (db_name,)) if cur.fetchone(): print(f" Database {db_name} already exists") cur.close() conn.close() return # Try creating database; handle collation mismatch gracefully try: cur.execute(f'CREATE DATABASE "{db_name}" OWNER {PG_USER};') print(f" Created database {db_name}") except Exception as e: err = str(e) if "collation version" in err or "template1" in err: print( " ⚠ Detected collation mismatch on template1; retrying with TEMPLATE template0" ) cur.execute( f'CREATE DATABASE "{db_name}" OWNER {PG_USER} TEMPLATE template0;' ) print(f" Created database {db_name} using TEMPLATE template0") else: raise cur.close() conn.close() def terminate_connections(db_name: str, max_wait: int = 10) -> bool: """Terminate all connections to a database. Returns True if all connections were terminated, False if some remain. Won't fail on permission errors (e.g., can't terminate superuser connections). """ import time conn = pg_connect("postgres") conn.autocommit = True cur = conn.cursor() for attempt in range(max_wait): # Check how many connections exist cur.execute( "SELECT COUNT(*) FROM pg_stat_activity WHERE datname = %s AND pid <> pg_backend_pid();", (db_name,), ) row = cur.fetchone() count = int(row[0]) if row else 0 if count == 0: cur.close() conn.close() return True print(f" Terminating {count} connection(s) to {db_name}...") # Try to terminate - ignore errors for connections we can't kill try: cur.execute( """ SELECT pg_terminate_backend(pid) FROM pg_stat_activity WHERE datname = %s AND pid <> pg_backend_pid() AND usename = current_user; -- Only terminate our own connections """, (db_name,), ) except Exception as e: print(f" Warning: {e}") # Brief wait for connections to close time.sleep(1) # Final check cur.execute( "SELECT COUNT(*) FROM pg_stat_activity WHERE datname = %s AND pid <> pg_backend_pid();", (db_name,), ) row = cur.fetchone() remaining = int(row[0]) if row else 0 cur.close() conn.close() if remaining > 0: print(f" Warning: {remaining} connection(s) still active (may be superuser sessions)") return False return True def database_exists(db_name: str) -> bool: """Check if a database exists.""" conn = pg_connect("postgres") conn.autocommit = True cur = conn.cursor() cur.execute("SELECT 1 FROM pg_database WHERE datname = %s", (db_name,)) exists = cur.fetchone() is not None cur.close() conn.close() return exists def rename_database(old_name: str, new_name: str) -> None: """Rename a PostgreSQL database.""" conn = pg_connect("postgres") conn.autocommit = True cur = conn.cursor() print(f" Renaming {old_name} -> {new_name}") cur.execute(f'ALTER DATABASE "{old_name}" RENAME TO "{new_name}";') cur.close() conn.close() def drop_database(db_name: str) -> bool: """Drop a PostgreSQL database. Returns True if dropped, False if failed (e.g., active connections). """ # First try to terminate connections terminate_connections(db_name) conn = pg_connect("postgres") conn.autocommit = True cur = conn.cursor() print(f" Dropping database {db_name}") try: cur.execute(f'DROP DATABASE IF EXISTS "{db_name}";') cur.close() conn.close() return True except Exception as e: print(f" Warning: Could not drop {db_name}: {e}") cur.close() conn.close() return False conn.close() # ---------------------- Main migration flow ---------------------- def run_migration( sqlite_path: str, dump_dt: Optional[datetime], notify: bool = True, dry_run: bool = False, ) -> UpsertReport: """ Create a timestamped staging DB, import data, then swap to production. Returns an UpsertReport with migration details. """ dump_dt = dump_dt or datetime.now(timezone.utc) ts = dump_dt.strftime("%Y%m%dT%H%M%SZ") staging_db = f"{PG_DATABASE}_{ts}" backup_db = f"{PG_DATABASE}_backup_{ts}" # Initialize report report = UpsertReport(sqlite_source=sqlite_path, dump_date=ts) report.start_time = datetime.now(timezone.utc) print(f"\n{'=' * 60}") print("SQLite -> PostgreSQL Migration") print(f"{'=' * 60}") print(f"Source: {sqlite_path}") print(f"Staging DB: {staging_db}") print(f"Production: {PG_DATABASE}") print(f"Chunk size: {CHUNK_SIZE:,}") print(f"{'=' * 60}\n") if dry_run: print("DRY RUN - no changes will be made") return report # Create staging database create_database(staging_db) sqlite_conn = None pg_conn = None try: sqlite_conn = sqlite3.connect(sqlite_path) pg_conn = pg_connect(staging_db) tables = get_sqlite_tables(sqlite_conn) print(f"Found {len(tables)} tables: {', '.join(tables)}\n") for i, table in enumerate(tables, 1): print(f"[{i}/{len(tables)}] Processing: {table}") columns = get_table_schema(sqlite_conn, table) create_table(pg_conn, table, columns) table_report = migrate_table(sqlite_conn, pg_conn, table, columns) report.tables.append(table_report) print() # Close connections before renaming pg_conn.close() pg_conn = None sqlite_conn.close() sqlite_conn = None # Perform the database swap print(f"\n{'=' * 60}") print("Migration complete — performing database swap") print(f"{'=' * 60}\n") terminate_connections(PG_DATABASE) terminate_connections(staging_db) if database_exists(PG_DATABASE): print(f" Backing up production DB to {backup_db}") rename_database(PG_DATABASE, backup_db) else: print(f" Production DB {PG_DATABASE} does not exist; nothing to back up") rename_database(staging_db, PG_DATABASE) print("\n✓ Database swap complete!") # Update state state = load_state() state["last_dump_date"] = dump_dt.strftime("%Y-%m-%d %H:%M:%S") state["last_upsert_time"] = datetime.now(timezone.utc).isoformat() save_state(state) report.end_time = datetime.now(timezone.utc) # Send success notification if notify: asyncio.run(notify_success(report)) return report except Exception as e: print(f"\n✗ Migration failed: {e}") # Cleanup staging DB on failure try: if pg_conn: pg_conn.close() terminate_connections(staging_db) drop_database(staging_db) except Exception as cleanup_err: print(f" Cleanup error: {cleanup_err}") if notify: asyncio.run(notify_failure(str(e), "Migration")) raise finally: if sqlite_conn: try: sqlite_conn.close() except Exception: pass if pg_conn: try: pg_conn.close() except Exception: pass # ---------------------- CLI ---------------------- def main() -> None: parser = argparse.ArgumentParser( description="Migrate SQLite dump to PostgreSQL with atomic swap", formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" Examples: %(prog)s # Auto-fetch latest dump and migrate %(prog)s --sqlite /path/to/db # Migrate local SQLite file %(prog)s --force # Force migration even if dump is not newer %(prog)s --no-notify # Disable Discord notifications %(prog)s --dry-run # Simulate without making changes """, ) parser.add_argument( "--sqlite", type=str, metavar="PATH", help="Path to local SQLite file (skips auto-fetch if provided)", ) parser.add_argument( "--dest", type=str, metavar="DIR", help="Destination directory for downloaded dumps", ) parser.add_argument( "--no-notify", action="store_true", help="Disable Discord notifications (enabled by default)", ) parser.add_argument( "--force", action="store_true", help="Force migration even if dump date is not newer", ) parser.add_argument( "--dry-run", action="store_true", help="Simulate without making changes", ) args = parser.parse_args() notify_enabled = not args.no_notify sqlite_path: Optional[str] = None dump_dt: Optional[datetime] = None try: if args.sqlite: # Use provided SQLite file sqlite_path = args.sqlite assert sqlite_path is not None if not Path(sqlite_path).exists(): print(f"Error: SQLite file not found: {sqlite_path}") sys.exit(1) # Try to parse date from filename parsed = parse_dump_date(Path(sqlite_path).name) dump_dt = parsed or datetime.now(timezone.utc) else: # Auto-fetch latest dump if notify_enabled: asyncio.run(notify_start()) print("Checking for latest dump on lrclib.net...") latest = asyncio.run(fetch_latest_dump_info()) if not latest: print("Error: Could not fetch latest dump info") if notify_enabled: asyncio.run(notify_failure("Could not fetch dump info", "Fetch")) sys.exit(1) dump_date = latest["date"] dump_date_str = dump_date.strftime("%Y-%m-%d %H:%M:%S") print(f"Latest dump: {latest['filename']} ({dump_date_str})") # Check if we need to update state = load_state() last_str = state.get("last_dump_date") if last_str and not args.force: try: last_dt = datetime.strptime(last_str, "%Y-%m-%d %H:%M:%S") if dump_date <= last_dt: print(f"No new dump available (last: {last_str})") print("Use --force to migrate anyway") sys.exit(0) except ValueError: pass # Invalid date format, proceed with migration print(f"New dump available: {dump_date_str}") if notify_enabled: asyncio.run( notify_new_dump_found(latest["filename"], dump_date_str) ) # Download print(f"\nDownloading {latest['filename']}...") db_path, err = asyncio.run( download_and_extract_dump(latest["url"], args.dest) ) if not db_path: print(f"Error: Download failed: {err}") if notify_enabled: asyncio.run(notify_failure(str(err), "Download")) sys.exit(1) sqlite_path = db_path dump_dt = dump_date # Run migration assert sqlite_path is not None run_migration( sqlite_path=sqlite_path, dump_dt=dump_dt, notify=notify_enabled, dry_run=args.dry_run, ) print("\n✓ Migration completed successfully!") except KeyboardInterrupt: print("\n\nInterrupted by user") sys.exit(130) except Exception as e: print(f"\nFatal error: {e}") if notify_enabled: asyncio.run(notify_failure(str(e), "Migration")) sys.exit(1) if __name__ == "__main__": main()