"""SQLite connection + schema initialization. Boring and inspectable (§5).""" from __future__ import annotations import sqlite3 from pathlib import Path SCHEMA_FILE = Path(__file__).with_name("schema.sql") def connect(db_path: Path) -> sqlite3.Connection: db_path = Path(db_path) db_path.parent.mkdir(parents=True, exist_ok=True) conn = sqlite3.connect(str(db_path), timeout=30) conn.row_factory = sqlite3.Row conn.execute("PRAGMA foreign_keys = ON") conn.execute("PRAGMA busy_timeout = 30000") # wait, don't fail, under concurrent backfill writers return conn # Additive migrations for DBs created before a column existed (CREATE IF NOT EXISTS won't add columns). _MIGRATIONS = { "documents": {"content_hash": "TEXT", "processed_at": "TEXT", "dedup_key": "TEXT"}, # DESIGN_v2.1 condition 1: own_network = the Ten31 orbit (Odell/Bent partners etc.) — listening to # ourselves. Quarantined: a TEST FIXTURE for the reflexivity case, DROPPED in live EISC scoring. "sources": {"backtest_2022_2023": "TEXT", "own_network": "INTEGER"}, # DESIGN_v2.1: tag derivatives by distance-from-edge for TRIAGE — surfaced, NEVER used as a filter # (an engine that pre-filters to in-mandate reproduces the AI/compute mandate-expansion miss). "fanout_nodes": {"distance_from_edge": "TEXT"}, } def _widen_cluster_check(conn: sqlite3.Connection) -> None: """Add 'banks'/'credit'/'fintech' to sources.source_cluster's CHECK. SQLite can't ALTER a CHECK, so rebuild the (tiny) table via the standard table-swap. Idempotent: no-op once already widened. Toggles foreign_keys OFF around the swap (DROP would otherwise fail on inbound FKs); data copied by value so referential integrity holds. busy_timeout (set in connect) lets it wait out concurrent backfill writers.""" import re row = conn.execute("SELECT sql FROM sqlite_master WHERE type='table' AND name='sources'").fetchone() if not row or "'banks'" in row[0]: return new_list = ("('macro','ai_tech','energy','bitcoin','vc_consensus','generalist'," "'banks','credit','fintech')") new_ddl = re.sub(r"source_cluster IN\s*\([^)]*\)", f"source_cluster IN {new_list}", row[0], count=1) new_ddl = new_ddl.replace("CREATE TABLE sources", "CREATE TABLE sources_new", 1) conn.commit() # close any implicit txn before toggling FK pragma conn.execute("PRAGMA foreign_keys=OFF") try: conn.execute(new_ddl) conn.execute("INSERT INTO sources_new SELECT * FROM sources") conn.execute("DROP TABLE sources") conn.execute("ALTER TABLE sources_new RENAME TO sources") conn.commit() finally: conn.execute("PRAGMA foreign_keys=ON") def _migrate(conn: sqlite3.Connection) -> None: for table, cols in _MIGRATIONS.items(): existing = {r[1] for r in conn.execute(f"PRAGMA table_info({table})")} for col, typ in cols.items(): if col not in existing: conn.execute(f"ALTER TABLE {table} ADD COLUMN {col} {typ}") # indexes on migrated columns (created here so they work on DBs predating the column) conn.execute("CREATE INDEX IF NOT EXISTS idx_documents_content_hash ON documents(content_hash)") conn.execute("CREATE INDEX IF NOT EXISTS idx_documents_dedup_key ON documents(dedup_key)") conn.commit() _widen_cluster_check(conn) def init_db(conn: sqlite3.Connection) -> None: """Idempotent: CREATE ... IF NOT EXISTS + additive column migrations.""" conn.executescript(SCHEMA_FILE.read_text()) conn.commit() _migrate(conn) def table_names(conn: sqlite3.Connection) -> list[str]: rows = conn.execute( "SELECT name FROM sqlite_master WHERE type IN ('table','view') ORDER BY name" ).fetchall() return [r[0] for r in rows]