Files

82 lines
3.8 KiB
Python

"""SQLite connection + schema initialization. Boring and inspectable (§5)."""
from __future__ import annotations
import sqlite3
from pathlib import Path
SCHEMA_FILE = Path(__file__).with_name("schema.sql")
def connect(db_path: Path) -> sqlite3.Connection:
db_path = Path(db_path)
db_path.parent.mkdir(parents=True, exist_ok=True)
conn = sqlite3.connect(str(db_path), timeout=30)
conn.row_factory = sqlite3.Row
conn.execute("PRAGMA foreign_keys = ON")
conn.execute("PRAGMA busy_timeout = 30000") # wait, don't fail, under concurrent backfill writers
return conn
# Additive migrations for DBs created before a column existed (CREATE IF NOT EXISTS won't add columns).
_MIGRATIONS = {
"documents": {"content_hash": "TEXT", "processed_at": "TEXT", "dedup_key": "TEXT"},
# DESIGN_v2.1 condition 1: own_network = the Ten31 orbit (Odell/Bent partners etc.) — listening to
# ourselves. Quarantined: a TEST FIXTURE for the reflexivity case, DROPPED in live EISC scoring.
"sources": {"backtest_2022_2023": "TEXT", "own_network": "INTEGER"},
# DESIGN_v2.1: tag derivatives by distance-from-edge for TRIAGE — surfaced, NEVER used as a filter
# (an engine that pre-filters to in-mandate reproduces the AI/compute mandate-expansion miss).
"fanout_nodes": {"distance_from_edge": "TEXT"},
}
def _widen_cluster_check(conn: sqlite3.Connection) -> None:
"""Add 'banks'/'credit'/'fintech' to sources.source_cluster's CHECK. SQLite can't ALTER a CHECK, so
rebuild the (tiny) table via the standard table-swap. Idempotent: no-op once already widened. Toggles
foreign_keys OFF around the swap (DROP would otherwise fail on inbound FKs); data copied by value so
referential integrity holds. busy_timeout (set in connect) lets it wait out concurrent backfill writers."""
import re
row = conn.execute("SELECT sql FROM sqlite_master WHERE type='table' AND name='sources'").fetchone()
if not row or "'banks'" in row[0]:
return
new_list = ("('macro','ai_tech','energy','bitcoin','vc_consensus','generalist',"
"'banks','credit','fintech')")
new_ddl = re.sub(r"source_cluster IN\s*\([^)]*\)", f"source_cluster IN {new_list}", row[0], count=1)
new_ddl = new_ddl.replace("CREATE TABLE sources", "CREATE TABLE sources_new", 1)
conn.commit() # close any implicit txn before toggling FK pragma
conn.execute("PRAGMA foreign_keys=OFF")
try:
conn.execute(new_ddl)
conn.execute("INSERT INTO sources_new SELECT * FROM sources")
conn.execute("DROP TABLE sources")
conn.execute("ALTER TABLE sources_new RENAME TO sources")
conn.commit()
finally:
conn.execute("PRAGMA foreign_keys=ON")
def _migrate(conn: sqlite3.Connection) -> None:
for table, cols in _MIGRATIONS.items():
existing = {r[1] for r in conn.execute(f"PRAGMA table_info({table})")}
for col, typ in cols.items():
if col not in existing:
conn.execute(f"ALTER TABLE {table} ADD COLUMN {col} {typ}")
# indexes on migrated columns (created here so they work on DBs predating the column)
conn.execute("CREATE INDEX IF NOT EXISTS idx_documents_content_hash ON documents(content_hash)")
conn.execute("CREATE INDEX IF NOT EXISTS idx_documents_dedup_key ON documents(dedup_key)")
conn.commit()
_widen_cluster_check(conn)
def init_db(conn: sqlite3.Connection) -> None:
"""Idempotent: CREATE ... IF NOT EXISTS + additive column migrations."""
conn.executescript(SCHEMA_FILE.read_text())
conn.commit()
_migrate(conn)
def table_names(conn: sqlite3.Connection) -> list[str]:
rows = conn.execute(
"SELECT name FROM sqlite_master WHERE type IN ('table','view') ORDER BY name"
).fetchall()
return [r[0] for r in rows]