Files
Keysat 3c31b1e8a5 Soft-delete + source-count diagnostics; thesis v4 (0.1.0:47)
- DELETE handlers soft-delete (set deleted_at) + cascade contact -> opps/comms/lp
  instead of hard-deleting (guardrail #3); list queries filter deleted rows.
- ingest: chunking excludes soft-deleted records; qdrant delete-by-source-id;
  sync prunes soft-deleted records' vectors incrementally.
- /api/system/status returns raw source-record counts for sanity-checking.
- docs/thesis-seed-v4.md (no "bet" language, scarcity-forward, freedom-tech as
  a banner option, tightened pillars, reworked segments + edge).

Soft-delete verified via the running HTTP server (delete -> hidden + row kept).

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-05 12:20:38 -05:00

143 lines
5.6 KiB
Python

#!/usr/bin/env python3
"""Phase-0 Workstream B4 — incremental, idempotent CRM -> Qdrant sync.
One command that keeps the index fresh:
1. Re-run deterministic entity resolution (cheap, idempotent, respects durable
fuzzy merges). Optionally re-run the local-Qwen fuzzy tier (--fuzzy).
2. Re-embed ONLY the source rows changed since the last sync (by updated_at);
the first run (or --recreate) is a full backfill.
3. Upsert with deterministic point ids (overwrite in place) and advance the
watermark. Logged to interaction_log.
Idempotent: re-running with no CRM changes embeds nothing. Watermark lives in an
`ingest_sync_state` table the pipeline owns.
python3 backend/ingest/sync.py --db data/crm_dev.db # incremental (full on first run)
python3 backend/ingest/sync.py --db data/crm_dev.db --recreate # force full rebuild
python3 backend/ingest/sync.py --db data/crm_dev.db --fuzzy # also run the Qwen fuzzy tier
LIMITATION: the CRM hard-deletes today, so a removed row's chunk is not pruned
incrementally (no tombstone). Until the DELETE handlers honor `deleted_at`, run a
periodic `--recreate` (or `backfill.py --recreate`) to drop orphans. Structural
entity-id changes (merges) are likewise best followed by a periodic full rebuild.
"""
import argparse
import json
import sqlite3
import uuid
from datetime import datetime, timezone
import backfill
import chunking
import config
import entity_resolution as er
import qdrant_io
_CHANGE_TABLES = [("communications", "communications"), ("contacts", "contacts"),
("lp_profiles", "lp_profiles"), ("opportunities", "opportunities"),
("organizations", "organizations"), ("fundraising_investors", "fundraising_investors")]
def _now():
# Match the CRM's updated_at format ("...Z") so the watermark compares
# correctly against source-row updated_at (server.now() in server.py).
return datetime.now(timezone.utc).replace(tzinfo=None).isoformat() + "Z"
def _ensure_state(conn):
conn.execute("""CREATE TABLE IF NOT EXISTS ingest_sync_state (
key TEXT PRIMARY KEY, value TEXT, updated_at TEXT DEFAULT (datetime('now')))""")
def _state_get(conn, key):
r = conn.execute("SELECT value FROM ingest_sync_state WHERE key=?", (key,)).fetchone()
return r[0] if r else None
def _state_set(conn, key, value):
conn.execute("""INSERT INTO ingest_sync_state (key, value, updated_at) VALUES (?,?,?)
ON CONFLICT(key) DO UPDATE SET value=excluded.value, updated_at=excluded.updated_at""",
(key, value, _now()))
def _deleted_source_ids(conn, since):
"""CRM records soft-deleted since the watermark — their chunks get pruned."""
ids = set()
for tbl in ("contacts", "organizations", "opportunities", "communications", "lp_profiles"):
try:
for r in conn.execute(f"SELECT id FROM {tbl} WHERE deleted_at IS NOT NULL AND deleted_at > ?", (since,)):
ids.add(r["id"])
except Exception:
pass
return ids
def _changed_source_ids(conn, since):
changed = set()
for tbl, model in _CHANGE_TABLES:
for r in conn.execute(f"SELECT id FROM {tbl} WHERE updated_at > ?", (since,)):
changed.add((model, r["id"]))
if chunking._has_table(conn, "emails"):
for r in conn.execute("SELECT id FROM emails WHERE updated_at > ? AND is_matched=1", (since,)):
changed.add(("emails", r["id"]))
return changed
def run(db, recreate=False, fuzzy=False, batch=32):
# 1. refresh the canonical layer (deterministic always; fuzzy on request)
er.run(db)
if fuzzy:
import fuzzy_resolve
fuzzy_resolve.run(db)
conn = sqlite3.connect(db)
conn.row_factory = sqlite3.Row
_ensure_state(conn)
last = _state_get(conn, "last_sync_ts")
run_start = _now()
qdrant_io.create_collection(recreate=recreate)
qdrant_io.ensure_indexes()
all_chunks = chunking.build_chunks(conn)
if last is None or recreate:
mode, target = "full", all_chunks
else:
# Prune chunks of records soft-deleted since the last sync.
deleted = _deleted_source_ids(conn, last)
if deleted:
qdrant_io.delete_by_source_ids(deleted)
changed = _changed_source_ids(conn, last)
mode, target = "incremental", [c for c in all_chunks
if (c["source_model"], c["source_id"]) in changed]
written = backfill.embed_and_upsert(target, batch=batch, progress=False)
_state_set(conn, "last_sync_ts", run_start)
summary = {"mode": mode, "rows_embedded": written, "total_chunks": len(all_chunks),
"qdrant_points": qdrant_io.count()}
conn.execute("""INSERT INTO interaction_log
(id, ts, actor_type, actor_id, action, target_type, payload, source, created_at)
VALUES (?,?,?,?,?,?,?,?,?)""",
(str(uuid.uuid4()), _now(), "system", "ingest_sync", "ingest.sync", "crm_chunks",
json.dumps(summary), "ingest", _now()))
conn.commit()
conn.close()
return summary
def main():
ap = argparse.ArgumentParser()
ap.add_argument("--db", default=config.DEFAULT_DB)
ap.add_argument("--recreate", action="store_true")
ap.add_argument("--fuzzy", action="store_true")
ap.add_argument("--batch", type=int, default=32)
args = ap.parse_args()
s = run(args.db, recreate=args.recreate, fuzzy=args.fuzzy, batch=args.batch)
print(f"Sync ({s['mode']}): embedded {s['rows_embedded']} chunk(s); "
f"{s['total_chunks']} total; Qdrant now holds {s['qdrant_points']} points.")
if __name__ == "__main__":
main()