Files
ten31-database/backend/ingest/backfill.py
T
Keysat f357c23c75 Phase 0 complete: fuzzy entity tier, incremental sync, Start9 packaging
- Fuzzy tier (backend/ingest/fuzzy_resolve.py + llm.py): local Qwen adjudicates
  the deterministic resolver's flagged name-variant candidates; merges are
  durable via entity_merges (deterministic re-runs respect them), losers
  soft-deleted, logged. Idempotent.
- Incremental sync (backend/ingest/sync.py): re-embeds only rows changed since a
  watermark (ingest_sync_state); first run / --recreate = full. Tested full→0→1.
- Start9 packaging (start9/0.4): Dockerfile bundles ingest+mcp + fastembed/mcp;
  "Build search index" action runs the init in a subcontainer; MCP shipped as a
  manual stdio server (not a daemon); version 0.1.0:44. INGEST_PACKAGING.md.
- backfill.py: factored embed_and_upsert() shared with sync.

Verified end-to-end on synthetic data + live Sparks/Qwen/Qdrant.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-05 08:55:12 -05:00

72 lines
2.5 KiB
Python

#!/usr/bin/env python3
"""Phase-0 Workstream B — backfill the CRM into Qdrant.
Chunk -> dense (bge-m3 via Spark Control) + sparse (BM25 client-side) -> upsert
to Qdrant `crm_chunks` with payload. Idempotent: deterministic point ids mean
re-running upserts in place. Reads the CRM by file path; never sends data to Claude.
python3 backend/ingest/backfill.py --db data/crm_dev.db --recreate
"""
import argparse
import sqlite3
import chunking
import config
import embed
import qdrant_io
import sparse
def embed_and_upsert(chunks, batch=32, progress=True):
"""Embed (dense + sparse) and upsert a list of chunks to Qdrant. Shared by the
full backfill and the incremental sync. Returns the number of points written."""
total = 0
for i in range(0, len(chunks), batch):
group = chunks[i:i + batch]
dense = embed.dense_embed([c["text"] for c in group])
points = []
for c, dv in zip(group, dense):
sv = sparse.encode(c["text"])
points.append({
"id": c["point_id"],
"vector": {"dense": dv, "sparse": {"indices": sv["indices"], "values": sv["values"]}},
"payload": {
"lp_id": c["lp_id"], "lp_name": c["lp_name"], "person_id": c["person_id"],
"doc_type": c["doc_type"], "date_ts": c["date_ts"], "text": c["text"],
"source_model": c["source_model"], "source_id": c["source_id"], "chunk_key": c["chunk_key"],
},
})
qdrant_io.upsert(points)
total += len(points)
if progress:
print(f" upserted {total}/{len(chunks)}")
return total
def run(db, recreate=False, batch=32):
conn = sqlite3.connect(db)
conn.row_factory = sqlite3.Row
chunks = chunking.build_chunks(conn)
conn.close()
print(f"Built {len(chunks)} chunks from {db}")
state = qdrant_io.create_collection(recreate=recreate)
qdrant_io.ensure_indexes()
print(f"Collection '{config.COLLECTION}': {state}")
embed_and_upsert(chunks, batch=batch)
print(f"Done. Qdrant '{config.COLLECTION}' now holds {qdrant_io.count()} points.")
def main():
ap = argparse.ArgumentParser()
ap.add_argument("--db", default=config.DEFAULT_DB)
ap.add_argument("--recreate", action="store_true", help="drop & recreate the collection first")
ap.add_argument("--batch", type=int, default=32)
args = ap.parse_args()
run(args.db, recreate=args.recreate, batch=args.batch)
if __name__ == "__main__":
main()