Files
Keysat c7ce44d963 Phase 0 foundation: canonical schema, ingest pipeline, CRM MCP server
Workstream A–C substrate for the Ten31 agentic system:
- A1: docs/crm-overview.md; CLAUDE.md conventions + guardrail #9
- A2: additive/reversible core migration (canonical_entities, entity_links,
  interaction_log, relationship_edges, soft-delete) + ledgered runner
- B1/B3: chunking + deterministic entity resolution (backend/ingest)
- B2: dense (bge-m3) + BM25 sparse ingest to Qdrant crm_chunks
- C: CRM MCP server (reads, retrieval modes, logged writes) — no outbound tools
- docs: redaction/re-hydration, Gmail enablement runbook
- synthetic test data; .env.example; housekeeping (.gitignore, untrack crm.db,
  drop legacy files + start9/0.3.5)

Verified end-to-end on synthetic data + live Sparks (hybrid > dense on entity
queries). Real backfill runs on Ten31 infra; index holds synthetic data only.
Branch snapshot also captures pre-existing working-tree changes.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-05 08:13:35 -05:00

91 lines
3.3 KiB
Python

"""Core-schema migration runner for the Ten31 CRM.
Phase 0 (Workstream A2) introduces ordered, reviewable SQL migrations for the
*core* schema, generalizing the pattern the Gmail subsystem already uses
(email_integration/db.py). Unlike the Gmail runner, this one keeps a
`schema_migrations` ledger so each file runs exactly once and the applied set is
auditable.
Design rules (CLAUDE.md guardrails):
* Additive and reversible only — migrations add tables / nullable columns;
they never drop or rewrite existing data. Each NNNN_*.sql may ship a paired
NNNN_*.down.sql for manual rollback (the .down files are never auto-applied).
* Idempotent — files use `CREATE TABLE/INDEX IF NOT EXISTS`. For the few
non-idempotent `ALTER TABLE ... ADD COLUMN` statements (SQLite has no
IF NOT EXISTS for columns), a partial-apply is tolerated by skipping
"duplicate column name" errors, matching the existing defensive pattern in
server.py:init_db().
Call `apply_core_migrations(conn)` from init_db() after the base tables exist.
"""
import os
import sqlite3
MIGRATIONS_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), "migrations")
def _split_statements(sql: str):
"""Split a SQL script into individual statements on ';' boundaries.
Sufficient for our migrations, which contain no procedural blocks or
semicolons inside string literals.
"""
return [s.strip() for s in sql.split(";") if s.strip()]
def _apply_statementwise(cursor, sql: str) -> None:
"""Execute a migration one statement at a time, tolerating an already-applied
`ALTER TABLE ... ADD COLUMN` (duplicate column). Used only as a fallback when
executescript() trips over a partially-applied migration."""
for stmt in _split_statements(sql):
try:
cursor.execute(stmt)
except sqlite3.OperationalError as exc:
if "duplicate column name" in str(exc).lower():
continue
raise
def apply_core_migrations(conn) -> None:
"""Apply any pending backend/migrations/NNNN_*.sql files once, in order.
Records each applied file in the `schema_migrations` ledger. `*.down.sql`
files are ignored (manual rollback only).
"""
cursor = conn.cursor()
cursor.execute(
"""
CREATE TABLE IF NOT EXISTS schema_migrations (
filename TEXT PRIMARY KEY,
applied_at TEXT DEFAULT (datetime('now'))
)
"""
)
conn.commit()
if not os.path.isdir(MIGRATIONS_DIR):
return
applied = {row[0] for row in cursor.execute("SELECT filename FROM schema_migrations")}
pending = sorted(
fn for fn in os.listdir(MIGRATIONS_DIR)
if fn.endswith(".sql") and not fn.endswith(".down.sql") and fn not in applied
)
for filename in pending:
path = os.path.join(MIGRATIONS_DIR, filename)
with open(path, "r", encoding="utf-8") as handle:
sql = handle.read()
try:
cursor.executescript(sql)
except sqlite3.OperationalError as exc:
if "duplicate column name" in str(exc).lower():
_apply_statementwise(cursor, sql)
else:
raise
cursor.execute("INSERT INTO schema_migrations (filename) VALUES (?)", (filename,))
conn.commit()
print(f"[migrations] applied {filename}")