"""Phase-0 Workstream B1 — chunk the CRM for retrieval. Maps each CRM record type to one or more chunks per docs/EMBEDDINGS.md: * one chunk per communications row (doc_type = the comm type) * one chunk per MATCHED email (doc_type = email; body only when matched) * one chunk per fundraising_investors notes LINE (the outreach log; split per line) * one chunk each for free-text fields: contacts.notes, lp_profiles.notes, opportunities (description + next_step), organizations.description Each chunk carries a canonical `lp_id` (resolved via entity_links) and a `date_ts` (epoch of the EVENT time, not created_at) so Qdrant can pre-filter and recency-rank. Entities/names/dates/types are payload (filterable); only prose is embedded. A chunk's stable `chunk_key` -> deterministic point id (uuid5), so re-ingest upserts in place (idempotent). """ import sqlite3 import uuid from datetime import datetime, timezone _NS = uuid.UUID("6ba7b811-9dad-11d1-80b4-00c04fd430c8") # uuid5 namespace for chunk ids def to_epoch(ts: str): if not ts: return None s = ts.strip().replace("Z", "+00:00") for parse in (datetime.fromisoformat,): try: dt = parse(s) if dt.tzinfo is None: dt = dt.replace(tzinfo=timezone.utc) return int(dt.timestamp()) except Exception: pass # date-only fallback try: return int(datetime.strptime(ts[:10], "%Y-%m-%d").replace(tzinfo=timezone.utc).timestamp()) except Exception: return None def _point_id(chunk_key: str) -> str: return str(uuid.uuid5(_NS, chunk_key)) def _mk(chunk_key, lp_id, lp_name, person_id, doc_type, date_ts, text, source_model, source_id): text = (text or "").strip() if not text or not lp_id: return None return { "chunk_key": chunk_key, "point_id": _point_id(chunk_key), "lp_id": lp_id, "lp_name": lp_name, "person_id": person_id, "doc_type": doc_type, "date_ts": date_ts, "text": text, "source_model": source_model, "source_id": source_id, } def _canon_maps(conn): """Resolution lookups from entity_links / canonical_entities.""" person_canon, org_canon, inv_canon = {}, {}, {} for r in conn.execute("SELECT source_model, source_id, canonical_id FROM entity_links"): if r["source_model"] == "contacts": person_canon[r["source_id"]] = r["canonical_id"] elif r["source_model"] == "organizations": org_canon[r["source_id"]] = r["canonical_id"] elif r["source_model"] == "fundraising_investors": inv_canon[r["source_id"]] = r["canonical_id"] name = {r["id"]: r["display_name"] for r in conn.execute("SELECT id, display_name FROM canonical_entities")} contact_org = {r["id"]: r["organization_id"] for r in conn.execute("SELECT id, organization_id FROM contacts")} return person_canon, org_canon, inv_canon, name, contact_org def _contact_lp(cid, person_canon, org_canon, name, contact_org): """Best lp_id for a contact-anchored chunk: the firm if known, else the person.""" person = person_canon.get(cid) firm = org_canon.get(contact_org.get(cid)) lp = firm or person return lp, name.get(lp), person def build_chunks(conn): person_canon, org_canon, inv_canon, name, contact_org = _canon_maps(conn) chunks = [] # communications for r in conn.execute("""SELECT id, contact_id, type, subject, body, outcome, next_action, communication_date FROM communications WHERE deleted_at IS NULL"""): lp, lp_name, person = _contact_lp(r["contact_id"], person_canon, org_canon, name, contact_org) parts = [p for p in (r["subject"], r["body"], r["outcome"], r["next_action"]) if (p or "").strip()] chunks.append(_mk(f"communications:{r['id']}", lp, lp_name, person, r["type"] or "note", to_epoch(r["communication_date"]), "\n".join(parts), "communications", r["id"])) # contacts.notes for r in conn.execute("SELECT id, notes, updated_at FROM contacts WHERE notes IS NOT NULL AND notes <> '' AND deleted_at IS NULL"): lp, lp_name, person = _contact_lp(r["id"], person_canon, org_canon, name, contact_org) chunks.append(_mk(f"contacts.notes:{r['id']}", lp, lp_name, person, "contact_note", to_epoch(r["updated_at"]), r["notes"], "contacts", r["id"])) # lp_profiles.notes for r in conn.execute("""SELECT lp.id, lp.contact_id, lp.notes, lp.updated_at FROM lp_profiles lp WHERE lp.notes IS NOT NULL AND lp.notes <> '' AND lp.deleted_at IS NULL"""): lp, lp_name, person = _contact_lp(r["contact_id"], person_canon, org_canon, name, contact_org) chunks.append(_mk(f"lp_profiles.notes:{r['id']}", lp, lp_name, person, "lp_note", to_epoch(r["updated_at"]), r["notes"], "lp_profiles", r["id"])) # opportunities (description + next_step) for r in conn.execute("""SELECT id, contact_id, name, description, next_step, updated_at FROM opportunities WHERE deleted_at IS NULL"""): lp, lp_name, person = _contact_lp(r["contact_id"], person_canon, org_canon, name, contact_org) parts = [p for p in (r["name"], r["description"], r["next_step"]) if (p or "").strip()] chunks.append(_mk(f"opportunities:{r['id']}", lp, lp_name, person, "opportunity", to_epoch(r["updated_at"]), "\n".join(parts), "opportunities", r["id"])) # organizations.description for r in conn.execute("""SELECT id, description, updated_at FROM organizations WHERE description IS NOT NULL AND description <> '' AND deleted_at IS NULL"""): lp = org_canon.get(r["id"]) chunks.append(_mk(f"organizations.description:{r['id']}", lp, name.get(lp), None, "org_note", to_epoch(r["updated_at"]), r["description"], "organizations", r["id"])) # fundraising_investors.notes — running outreach log, split per non-empty line for r in conn.execute("""SELECT id, notes, updated_at FROM fundraising_investors WHERE notes IS NOT NULL AND notes <> ''"""): lp = inv_canon.get(r["id"]) for i, line in enumerate(str(r["notes"]).splitlines()): if line.strip(): chunks.append(_mk(f"fundraising_investors.notes:{r['id']}:{i}", lp, name.get(lp), None, "outreach_note", to_epoch(r["updated_at"]), line, "fundraising_investors", r["id"])) # MATCHED emails (only matched rows carry a body; key lp via email_investor_links) if _has_table(conn, "emails") and _has_table(conn, "email_investor_links"): for r in conn.execute("""SELECT id, subject, body_text, snippet, sent_at FROM emails WHERE is_matched=1"""): lp, lp_name = _email_lp(conn, r["id"], inv_canon, org_canon, person_canon, name) text = "\n".join(p for p in (r["subject"], r["body_text"] or r["snippet"]) if (p or "").strip()) chunks.append(_mk(f"emails:{r['id']}", lp, lp_name, None, "email", to_epoch(r["sent_at"]), text, "emails", r["id"])) return [c for c in chunks if c] def _has_table(conn, name): return conn.execute("SELECT 1 FROM sqlite_master WHERE type='table' AND name=?", (name,)).fetchone() is not None def _email_lp(conn, email_id, inv_canon, org_canon, person_canon, name): """Resolve a matched email's lp_id via email_investor_links, precedence: fundraising_investor -> contact -> organization.""" row = conn.execute("""SELECT fundraising_investor_id, contact_id, organization_id FROM email_investor_links WHERE email_id=? ORDER BY match_confidence DESC LIMIT 1""", (email_id,)).fetchone() if not row: return None, None lp = (inv_canon.get(row["fundraising_investor_id"]) or person_canon.get(row["contact_id"]) or org_canon.get(row["organization_id"])) return lp, name.get(lp) if __name__ == "__main__": import argparse from collections import Counter from config import DEFAULT_DB ap = argparse.ArgumentParser() ap.add_argument("--db", default=DEFAULT_DB) args = ap.parse_args() conn = sqlite3.connect(args.db) conn.row_factory = sqlite3.Row chunks = build_chunks(conn) print(f"{len(chunks)} chunks from {args.db}") for dt, n in Counter(c["doc_type"] for c in chunks).most_common(): print(f" {dt:<16} {n}") unresolved = sum(1 for c in chunks if not c["lp_id"]) print(f" (all chunks have an lp_id: {unresolved == 0})") print("\nSample chunk:") s = chunks[0] print({k: (v[:80] + '…' if k == 'text' and v and len(v) > 80 else v) for k, v in s.items()})