Files
Keysat c7ce44d963 Phase 0 foundation: canonical schema, ingest pipeline, CRM MCP server
Workstream A–C substrate for the Ten31 agentic system:
- A1: docs/crm-overview.md; CLAUDE.md conventions + guardrail #9
- A2: additive/reversible core migration (canonical_entities, entity_links,
  interaction_log, relationship_edges, soft-delete) + ledgered runner
- B1/B3: chunking + deterministic entity resolution (backend/ingest)
- B2: dense (bge-m3) + BM25 sparse ingest to Qdrant crm_chunks
- C: CRM MCP server (reads, retrieval modes, logged writes) — no outbound tools
- docs: redaction/re-hydration, Gmail enablement runbook
- synthetic test data; .env.example; housekeeping (.gitignore, untrack crm.db,
  drop legacy files + start9/0.3.5)

Verified end-to-end on synthetic data + live Sparks (hybrid > dense on entity
queries). Real backfill runs on Ten31 infra; index holds synthetic data only.
Branch snapshot also captures pre-existing working-tree changes.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-05 08:13:35 -05:00

110 lines
4.0 KiB
Python

#!/usr/bin/env python3
"""Phase-0 retrieval — thin wrappers over Spark Control /api/search.
These are the retrieval modes the CRM MCP server (Workstream C) will expose:
* semantic_search — dense only (omit sparse), high recall
* hybrid_search — dense + BM25 sparse (RRF) + rerank; best for entity queries
* keyword_search — lean on the sparse leg
All support a Qdrant `filter` (e.g. lp_id / date_ts range) to pre-filter.
`--demo` runs an entity-heavy query in dense-only vs hybrid to show the BM25
lexical leg surfacing the right LP. The query's sparse vector uses the SAME
encoder as ingest (sparse.encode).
"""
import argparse
import config
import http_util
import sparse
def _search(query, sparse_vec=None, rerank=False, top_k=5, lp_id=None, retrieve_n=80, filt=None):
body = {"query": query, "collection": config.COLLECTION, "top_k": top_k,
"retrieve_n": retrieve_n, "fusion": "rrf", "text_field": "text",
"with_payload": True, "rerank": rerank}
if sparse_vec is not None:
body["sparse"] = {"indices": sparse_vec["indices"], "values": sparse_vec["values"]}
# An explicit raw Qdrant filter (filt) wins; otherwise build one from lp_id.
if filt is not None:
body["filter"] = filt
elif lp_id:
body["filter"] = {"must": [{"key": "lp_id", "match": {"value": lp_id}}]}
status, data = http_util.request("POST", f"{config.SPARK_CONTROL_URL}/api/search",
body, verify=config.SPARK_VERIFY_TLS)
if status != 200:
raise RuntimeError(f"/api/search -> {status}: {data}")
return data.get("data", [])
def semantic_search(query, **kw):
return _search(query, sparse_vec=None, rerank=kw.pop("rerank", False), **kw)
def hybrid_search(query, **kw):
return _search(query, sparse_vec=sparse.encode(query), rerank=kw.pop("rerank", True), **kw)
def keyword_search(query, **kw):
return _search(query, sparse_vec=sparse.encode(query), rerank=kw.pop("rerank", True), **kw)
def _row(r):
p = r.get("payload", {}) or {}
text = (r.get("text") or p.get("text") or "").replace("\n", " ")
return f"{p.get('lp_name', '?'):<22} [{p.get('doc_type', '?'):<13}] {text[:58]}"
def _print(title, rows):
print(f"\n {title}")
if not rows:
print(" (no results)")
for i, r in enumerate(rows, 1):
print(f" {i}. score={r.get('score', 0):+.3f} {_row(r)}")
def demo():
target = "Cedar Point Capital"
q = "Fund III diligence and wire timeline for Cedar Point"
print(f"QUERY: {q!r}\nTarget LP: {target}")
dense = semantic_search(q, top_k=5)
hybrid = hybrid_search(q, top_k=5, rerank=False) # rerank off to isolate the BM25 leg
_print("dense-only (semantic):", dense)
_print("hybrid (dense + BM25 RRF):", hybrid)
def first_rank(rows):
for i, r in enumerate(rows, 1):
if (r.get("payload", {}) or {}).get("lp_name") == target:
return i
return None
print(f"\n First '{target}' chunk — dense rank: {first_rank(dense)}, hybrid rank: {first_rank(hybrid)}")
# Pre-filter demo: same query, restricted to one LP's chunks.
lp_id = None
for r in hybrid:
p = r.get("payload", {}) or {}
if p.get("lp_name") == target:
lp_id = p.get("lp_id")
break
if lp_id:
_print(f"hybrid + payload pre-filter (lp_id={lp_id}):",
hybrid_search(q, top_k=5, rerank=True, lp_id=lp_id))
def main():
ap = argparse.ArgumentParser()
ap.add_argument("query", nargs="?")
ap.add_argument("--mode", choices=["semantic", "hybrid", "keyword"], default="hybrid")
ap.add_argument("--top-k", type=int, default=5)
ap.add_argument("--lp-id")
ap.add_argument("--demo", action="store_true")
args = ap.parse_args()
if args.demo or not args.query:
return demo()
fn = {"semantic": semantic_search, "hybrid": hybrid_search, "keyword": keyword_search}[args.mode]
_print(f"{args.mode}: {args.query!r}", fn(args.query, top_k=args.top_k, lp_id=args.lp_id))
if __name__ == "__main__":
main()