"""Core-schema migration runner for the Ten31 CRM. Phase 0 (Workstream A2) introduces ordered, reviewable SQL migrations for the *core* schema, generalizing the pattern the Gmail subsystem already uses (email_integration/db.py). Unlike the Gmail runner, this one keeps a `schema_migrations` ledger so each file runs exactly once and the applied set is auditable. Design rules (CLAUDE.md guardrails): * Additive and reversible only — migrations add tables / nullable columns; they never drop or rewrite existing data. Each NNNN_*.sql may ship a paired NNNN_*.down.sql for manual rollback (the .down files are never auto-applied). * Idempotent — files use `CREATE TABLE/INDEX IF NOT EXISTS`. For the few non-idempotent `ALTER TABLE ... ADD COLUMN` statements (SQLite has no IF NOT EXISTS for columns), a partial-apply is tolerated by skipping "duplicate column name" errors, matching the existing defensive pattern in server.py:init_db(). Call `apply_core_migrations(conn)` from init_db() after the base tables exist. """ import os import sqlite3 MIGRATIONS_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), "migrations") def _split_statements(sql: str): """Split a SQL script into individual statements on ';' boundaries. Sufficient for our migrations, which contain no procedural blocks or semicolons inside string literals. """ return [s.strip() for s in sql.split(";") if s.strip()] def _apply_statementwise(cursor, sql: str) -> None: """Execute a migration one statement at a time, tolerating an already-applied `ALTER TABLE ... ADD COLUMN` (duplicate column). Used only as a fallback when executescript() trips over a partially-applied migration.""" for stmt in _split_statements(sql): try: cursor.execute(stmt) except sqlite3.OperationalError as exc: if "duplicate column name" in str(exc).lower(): continue raise def apply_core_migrations(conn) -> None: """Apply any pending backend/migrations/NNNN_*.sql files once, in order. Records each applied file in the `schema_migrations` ledger. `*.down.sql` files are ignored (manual rollback only). """ cursor = conn.cursor() cursor.execute( """ CREATE TABLE IF NOT EXISTS schema_migrations ( filename TEXT PRIMARY KEY, applied_at TEXT DEFAULT (datetime('now')) ) """ ) conn.commit() if not os.path.isdir(MIGRATIONS_DIR): return applied = {row[0] for row in cursor.execute("SELECT filename FROM schema_migrations")} pending = sorted( fn for fn in os.listdir(MIGRATIONS_DIR) if fn.endswith(".sql") and not fn.endswith(".down.sql") and fn not in applied ) for filename in pending: path = os.path.join(MIGRATIONS_DIR, filename) with open(path, "r", encoding="utf-8") as handle: sql = handle.read() try: cursor.executescript(sql) except sqlite3.OperationalError as exc: if "duplicate column name" in str(exc).lower(): _apply_statementwise(cursor, sql) else: raise cursor.execute("INSERT INTO schema_migrations (filename) VALUES (?)", (filename,)) conn.commit() print(f"[migrations] applied {filename}")