#!/usr/bin/env python3 """Phase-0 retrieval — thin wrappers over Spark Control /api/search. These are the retrieval modes the CRM MCP server (Workstream C) will expose: * semantic_search — dense only (omit sparse), high recall * hybrid_search — dense + BM25 sparse (RRF) + rerank; best for entity queries * keyword_search — lean on the sparse leg All support a Qdrant `filter` (e.g. lp_id / date_ts range) to pre-filter. `--demo` runs an entity-heavy query in dense-only vs hybrid to show the BM25 lexical leg surfacing the right LP. The query's sparse vector uses the SAME encoder as ingest (sparse.encode). """ import argparse import config import http_util import sparse def _search(query, sparse_vec=None, rerank=False, top_k=5, lp_id=None, retrieve_n=80, filt=None): body = {"query": query, "collection": config.COLLECTION, "top_k": top_k, "retrieve_n": retrieve_n, "fusion": "rrf", "text_field": "text", "with_payload": True, "rerank": rerank} if sparse_vec is not None: body["sparse"] = {"indices": sparse_vec["indices"], "values": sparse_vec["values"]} # An explicit raw Qdrant filter (filt) wins; otherwise build one from lp_id. if filt is not None: body["filter"] = filt elif lp_id: body["filter"] = {"must": [{"key": "lp_id", "match": {"value": lp_id}}]} status, data = http_util.request("POST", f"{config.SPARK_CONTROL_URL}/api/search", body, verify=config.SPARK_VERIFY_TLS) if status != 200: raise RuntimeError(f"/api/search -> {status}: {data}") return data.get("data", []) def semantic_search(query, **kw): return _search(query, sparse_vec=None, rerank=kw.pop("rerank", False), **kw) def hybrid_search(query, **kw): return _search(query, sparse_vec=sparse.encode(query), rerank=kw.pop("rerank", True), **kw) def keyword_search(query, **kw): return _search(query, sparse_vec=sparse.encode(query), rerank=kw.pop("rerank", True), **kw) def _row(r): p = r.get("payload", {}) or {} text = (r.get("text") or p.get("text") or "").replace("\n", " ") return f"{p.get('lp_name', '?'):<22} [{p.get('doc_type', '?'):<13}] {text[:58]}" def _print(title, rows): print(f"\n {title}") if not rows: print(" (no results)") for i, r in enumerate(rows, 1): print(f" {i}. score={r.get('score', 0):+.3f} {_row(r)}") def demo(): target = "Cedar Point Capital" q = "Fund III diligence and wire timeline for Cedar Point" print(f"QUERY: {q!r}\nTarget LP: {target}") dense = semantic_search(q, top_k=5) hybrid = hybrid_search(q, top_k=5, rerank=False) # rerank off to isolate the BM25 leg _print("dense-only (semantic):", dense) _print("hybrid (dense + BM25 RRF):", hybrid) def first_rank(rows): for i, r in enumerate(rows, 1): if (r.get("payload", {}) or {}).get("lp_name") == target: return i return None print(f"\n First '{target}' chunk — dense rank: {first_rank(dense)}, hybrid rank: {first_rank(hybrid)}") # Pre-filter demo: same query, restricted to one LP's chunks. lp_id = None for r in hybrid: p = r.get("payload", {}) or {} if p.get("lp_name") == target: lp_id = p.get("lp_id") break if lp_id: _print(f"hybrid + payload pre-filter (lp_id={lp_id}):", hybrid_search(q, top_k=5, rerank=True, lp_id=lp_id)) def main(): ap = argparse.ArgumentParser() ap.add_argument("query", nargs="?") ap.add_argument("--mode", choices=["semantic", "hybrid", "keyword"], default="hybrid") ap.add_argument("--top-k", type=int, default=5) ap.add_argument("--lp-id") ap.add_argument("--demo", action="store_true") args = ap.parse_args() if args.demo or not args.query: return demo() fn = {"semantic": semantic_search, "hybrid": hybrid_search, "keyword": keyword_search}[args.mode] _print(f"{args.mode}: {args.query!r}", fn(args.query, top_k=args.top_k, lp_id=args.lp_id)) if __name__ == "__main__": main()