Files
ten31-database/backend/ingest/sync.py
T
Keysat f357c23c75 Phase 0 complete: fuzzy entity tier, incremental sync, Start9 packaging
- Fuzzy tier (backend/ingest/fuzzy_resolve.py + llm.py): local Qwen adjudicates
  the deterministic resolver's flagged name-variant candidates; merges are
  durable via entity_merges (deterministic re-runs respect them), losers
  soft-deleted, logged. Idempotent.
- Incremental sync (backend/ingest/sync.py): re-embeds only rows changed since a
  watermark (ingest_sync_state); first run / --recreate = full. Tested full→0→1.
- Start9 packaging (start9/0.4): Dockerfile bundles ingest+mcp + fastembed/mcp;
  "Build search index" action runs the init in a subcontainer; MCP shipped as a
  manual stdio server (not a daemon); version 0.1.0:44. INGEST_PACKAGING.md.
- backfill.py: factored embed_and_upsert() shared with sync.

Verified end-to-end on synthetic data + live Sparks/Qwen/Qdrant.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-05 08:55:12 -05:00

127 lines
4.9 KiB
Python

#!/usr/bin/env python3
"""Phase-0 Workstream B4 — incremental, idempotent CRM -> Qdrant sync.
One command that keeps the index fresh:
1. Re-run deterministic entity resolution (cheap, idempotent, respects durable
fuzzy merges). Optionally re-run the local-Qwen fuzzy tier (--fuzzy).
2. Re-embed ONLY the source rows changed since the last sync (by updated_at);
the first run (or --recreate) is a full backfill.
3. Upsert with deterministic point ids (overwrite in place) and advance the
watermark. Logged to interaction_log.
Idempotent: re-running with no CRM changes embeds nothing. Watermark lives in an
`ingest_sync_state` table the pipeline owns.
python3 backend/ingest/sync.py --db data/crm_dev.db # incremental (full on first run)
python3 backend/ingest/sync.py --db data/crm_dev.db --recreate # force full rebuild
python3 backend/ingest/sync.py --db data/crm_dev.db --fuzzy # also run the Qwen fuzzy tier
LIMITATION: the CRM hard-deletes today, so a removed row's chunk is not pruned
incrementally (no tombstone). Until the DELETE handlers honor `deleted_at`, run a
periodic `--recreate` (or `backfill.py --recreate`) to drop orphans. Structural
entity-id changes (merges) are likewise best followed by a periodic full rebuild.
"""
import argparse
import json
import sqlite3
import uuid
from datetime import datetime, timezone
import backfill
import chunking
import config
import entity_resolution as er
import qdrant_io
_CHANGE_TABLES = [("communications", "communications"), ("contacts", "contacts"),
("lp_profiles", "lp_profiles"), ("opportunities", "opportunities"),
("organizations", "organizations"), ("fundraising_investors", "fundraising_investors")]
def _now():
# Match the CRM's updated_at format ("...Z") so the watermark compares
# correctly against source-row updated_at (server.now() in server.py).
return datetime.now(timezone.utc).replace(tzinfo=None).isoformat() + "Z"
def _ensure_state(conn):
conn.execute("""CREATE TABLE IF NOT EXISTS ingest_sync_state (
key TEXT PRIMARY KEY, value TEXT, updated_at TEXT DEFAULT (datetime('now')))""")
def _state_get(conn, key):
r = conn.execute("SELECT value FROM ingest_sync_state WHERE key=?", (key,)).fetchone()
return r[0] if r else None
def _state_set(conn, key, value):
conn.execute("""INSERT INTO ingest_sync_state (key, value, updated_at) VALUES (?,?,?)
ON CONFLICT(key) DO UPDATE SET value=excluded.value, updated_at=excluded.updated_at""",
(key, value, _now()))
def _changed_source_ids(conn, since):
changed = set()
for tbl, model in _CHANGE_TABLES:
for r in conn.execute(f"SELECT id FROM {tbl} WHERE updated_at > ?", (since,)):
changed.add((model, r["id"]))
if chunking._has_table(conn, "emails"):
for r in conn.execute("SELECT id FROM emails WHERE updated_at > ? AND is_matched=1", (since,)):
changed.add(("emails", r["id"]))
return changed
def run(db, recreate=False, fuzzy=False, batch=32):
# 1. refresh the canonical layer (deterministic always; fuzzy on request)
er.run(db)
if fuzzy:
import fuzzy_resolve
fuzzy_resolve.run(db)
conn = sqlite3.connect(db)
conn.row_factory = sqlite3.Row
_ensure_state(conn)
last = _state_get(conn, "last_sync_ts")
run_start = _now()
qdrant_io.create_collection(recreate=recreate)
qdrant_io.ensure_indexes()
all_chunks = chunking.build_chunks(conn)
if last is None or recreate:
mode, target = "full", all_chunks
else:
changed = _changed_source_ids(conn, last)
mode, target = "incremental", [c for c in all_chunks
if (c["source_model"], c["source_id"]) in changed]
written = backfill.embed_and_upsert(target, batch=batch, progress=False)
_state_set(conn, "last_sync_ts", run_start)
summary = {"mode": mode, "rows_embedded": written, "total_chunks": len(all_chunks),
"qdrant_points": qdrant_io.count()}
conn.execute("""INSERT INTO interaction_log
(id, ts, actor_type, actor_id, action, target_type, payload, source, created_at)
VALUES (?,?,?,?,?,?,?,?,?)""",
(str(uuid.uuid4()), _now(), "system", "ingest_sync", "ingest.sync", "crm_chunks",
json.dumps(summary), "ingest", _now()))
conn.commit()
conn.close()
return summary
def main():
ap = argparse.ArgumentParser()
ap.add_argument("--db", default=config.DEFAULT_DB)
ap.add_argument("--recreate", action="store_true")
ap.add_argument("--fuzzy", action="store_true")
ap.add_argument("--batch", type=int, default=32)
args = ap.parse_args()
s = run(args.db, recreate=args.recreate, fuzzy=args.fuzzy, batch=args.batch)
print(f"Sync ({s['mode']}): embedded {s['rows_embedded']} chunk(s); "
f"{s['total_chunks']} total; Qdrant now holds {s['qdrant_points']} points.")
if __name__ == "__main__":
main()