#!/usr/bin/env python3 """Phase-0 Workstream B4 — incremental, idempotent CRM -> Qdrant sync. One command that keeps the index fresh: 1. Re-run deterministic entity resolution (cheap, idempotent, respects durable fuzzy merges). Optionally re-run the local-Qwen fuzzy tier (--fuzzy). 2. Re-embed ONLY the source rows changed since the last sync (by updated_at); the first run (or --recreate) is a full backfill. 3. Upsert with deterministic point ids (overwrite in place) and advance the watermark. Logged to interaction_log. Idempotent: re-running with no CRM changes embeds nothing. Watermark lives in an `ingest_sync_state` table the pipeline owns. python3 backend/ingest/sync.py --db data/crm_dev.db # incremental (full on first run) python3 backend/ingest/sync.py --db data/crm_dev.db --recreate # force full rebuild python3 backend/ingest/sync.py --db data/crm_dev.db --fuzzy # also run the Qwen fuzzy tier LIMITATION: the CRM hard-deletes today, so a removed row's chunk is not pruned incrementally (no tombstone). Until the DELETE handlers honor `deleted_at`, run a periodic `--recreate` (or `backfill.py --recreate`) to drop orphans. Structural entity-id changes (merges) are likewise best followed by a periodic full rebuild. """ import argparse import json import sqlite3 import uuid from datetime import datetime, timezone import backfill import chunking import config import entity_resolution as er import qdrant_io _CHANGE_TABLES = [("communications", "communications"), ("contacts", "contacts"), ("lp_profiles", "lp_profiles"), ("opportunities", "opportunities"), ("organizations", "organizations"), ("fundraising_investors", "fundraising_investors")] def _now(): # Match the CRM's updated_at format ("...Z") so the watermark compares # correctly against source-row updated_at (server.now() in server.py). return datetime.now(timezone.utc).replace(tzinfo=None).isoformat() + "Z" def _ensure_state(conn): conn.execute("""CREATE TABLE IF NOT EXISTS ingest_sync_state ( key TEXT PRIMARY KEY, value TEXT, updated_at TEXT DEFAULT (datetime('now')))""") def _state_get(conn, key): r = conn.execute("SELECT value FROM ingest_sync_state WHERE key=?", (key,)).fetchone() return r[0] if r else None def _state_set(conn, key, value): conn.execute("""INSERT INTO ingest_sync_state (key, value, updated_at) VALUES (?,?,?) ON CONFLICT(key) DO UPDATE SET value=excluded.value, updated_at=excluded.updated_at""", (key, value, _now())) def _deleted_source_ids(conn, since): """CRM records soft-deleted since the watermark — their chunks get pruned.""" ids = set() for tbl in ("contacts", "organizations", "opportunities", "communications", "lp_profiles"): try: for r in conn.execute(f"SELECT id FROM {tbl} WHERE deleted_at IS NOT NULL AND deleted_at > ?", (since,)): ids.add(r["id"]) except Exception: pass return ids def _changed_source_ids(conn, since): changed = set() for tbl, model in _CHANGE_TABLES: for r in conn.execute(f"SELECT id FROM {tbl} WHERE updated_at > ?", (since,)): changed.add((model, r["id"])) if chunking._has_table(conn, "emails"): for r in conn.execute("SELECT id FROM emails WHERE updated_at > ? AND is_matched=1", (since,)): changed.add(("emails", r["id"])) return changed def run(db, recreate=False, fuzzy=False, batch=32): # 1. refresh the canonical layer (deterministic always; fuzzy on request) er.run(db) if fuzzy: import fuzzy_resolve fuzzy_resolve.run(db) conn = sqlite3.connect(db) conn.row_factory = sqlite3.Row _ensure_state(conn) last = _state_get(conn, "last_sync_ts") run_start = _now() qdrant_io.create_collection(recreate=recreate) qdrant_io.ensure_indexes() all_chunks = chunking.build_chunks(conn) if last is None or recreate: mode, target = "full", all_chunks else: # Prune chunks of records soft-deleted since the last sync. deleted = _deleted_source_ids(conn, last) if deleted: qdrant_io.delete_by_source_ids(deleted) changed = _changed_source_ids(conn, last) mode, target = "incremental", [c for c in all_chunks if (c["source_model"], c["source_id"]) in changed] written = backfill.embed_and_upsert(target, batch=batch, progress=False) _state_set(conn, "last_sync_ts", run_start) summary = {"mode": mode, "rows_embedded": written, "total_chunks": len(all_chunks), "qdrant_points": qdrant_io.count()} conn.execute("""INSERT INTO interaction_log (id, ts, actor_type, actor_id, action, target_type, payload, source, created_at) VALUES (?,?,?,?,?,?,?,?,?)""", (str(uuid.uuid4()), _now(), "system", "ingest_sync", "ingest.sync", "crm_chunks", json.dumps(summary), "ingest", _now())) conn.commit() conn.close() return summary def main(): ap = argparse.ArgumentParser() ap.add_argument("--db", default=config.DEFAULT_DB) ap.add_argument("--recreate", action="store_true") ap.add_argument("--fuzzy", action="store_true") ap.add_argument("--batch", type=int, default=32) args = ap.parse_args() s = run(args.db, recreate=args.recreate, fuzzy=args.fuzzy, batch=args.batch) print(f"Sync ({s['mode']}): embedded {s['rows_embedded']} chunk(s); " f"{s['total_chunks']} total; Qdrant now holds {s['qdrant_points']} points.") if __name__ == "__main__": main()