Files
ten31-database/backend/ingest/chunking.py
T
Keysat 1564c087bf Remove Instructions/Feedback + lp_profiles; sync retry, purge, mobile fixes (v0.1.0:104)
Removals (net -570 lines):
- Delete the Instructions and Feedback (feature_requests) pages + backend.
- Retire lp_profiles + investor_type across server, ingest, and seeds; migration
  0008 drops both empty tables (a sanctioned one-off exception to
  never-hard-delete). 0001's lp_profiles ALTER is removed so a fresh DB doesn't
  break the migration chain (live DBs already applied it).

Fixes:
- Email sync: a transient timeout no longer terminally parks a mailbox; the
  scheduler retries 'retrying' each cycle and re-includes errored accounts on an
  hourly backoff, so stuck mailboxes self-heal.
- Mobile Contacts: page through the full directory (server caps 500/page) -- one
  fetch silently truncated at 720, hiding people from the list and from search.
- Mobile email review: clock icon to set a reminder inline; approval cards show
  date/time.

New:
- Admin-only purge of soft-deleted rows (Settings -> Admin; type-to-confirm,
  refuses any row still linked to live data).

Tests: 45/45 (adds test_sync_ready + test_purge_soft_deleted). Reviewer pass
applied (NULL reminders.contact_id on contact purge). Bumped to v0.1.0:104.
2026-06-20 20:06:11 -05:00

178 lines
8.2 KiB
Python

"""Phase-0 Workstream B1 — chunk the CRM for retrieval.
Maps each CRM record type to one or more chunks per docs/EMBEDDINGS.md:
* one chunk per communications row (doc_type = the comm type)
* one chunk per MATCHED email (doc_type = email; body only when matched)
* one chunk per fundraising_investors notes LINE (the outreach log; split per line)
* one chunk each for free-text fields: contacts.notes,
opportunities (description + next_step), organizations.description
Each chunk carries a canonical `lp_id` (resolved via entity_links) and a `date_ts`
(epoch of the EVENT time, not created_at) so Qdrant can pre-filter and recency-rank.
Entities/names/dates/types are payload (filterable); only prose is embedded.
A chunk's stable `chunk_key` -> deterministic point id (uuid5), so re-ingest
upserts in place (idempotent).
"""
import sqlite3
import uuid
from datetime import datetime, timezone
_NS = uuid.UUID("6ba7b811-9dad-11d1-80b4-00c04fd430c8") # uuid5 namespace for chunk ids
def to_epoch(ts: str):
if not ts:
return None
s = ts.strip().replace("Z", "+00:00")
for parse in (datetime.fromisoformat,):
try:
dt = parse(s)
if dt.tzinfo is None:
dt = dt.replace(tzinfo=timezone.utc)
return int(dt.timestamp())
except Exception:
pass
# date-only fallback
try:
return int(datetime.strptime(ts[:10], "%Y-%m-%d").replace(tzinfo=timezone.utc).timestamp())
except Exception:
return None
def _point_id(chunk_key: str) -> str:
return str(uuid.uuid5(_NS, chunk_key))
def _mk(chunk_key, lp_id, lp_name, person_id, doc_type, date_ts, text, source_model, source_id):
text = (text or "").strip()
if not text or not lp_id:
return None
return {
"chunk_key": chunk_key,
"point_id": _point_id(chunk_key),
"lp_id": lp_id,
"lp_name": lp_name,
"person_id": person_id,
"doc_type": doc_type,
"date_ts": date_ts,
"text": text,
"source_model": source_model,
"source_id": source_id,
}
def _canon_maps(conn):
"""Resolution lookups from entity_links / canonical_entities."""
person_canon, org_canon, inv_canon = {}, {}, {}
for r in conn.execute("SELECT source_model, source_id, canonical_id FROM entity_links"):
if r["source_model"] == "contacts":
person_canon[r["source_id"]] = r["canonical_id"]
elif r["source_model"] == "organizations":
org_canon[r["source_id"]] = r["canonical_id"]
elif r["source_model"] == "fundraising_investors":
inv_canon[r["source_id"]] = r["canonical_id"]
name = {r["id"]: r["display_name"] for r in conn.execute("SELECT id, display_name FROM canonical_entities")}
contact_org = {r["id"]: r["organization_id"] for r in conn.execute("SELECT id, organization_id FROM contacts")}
return person_canon, org_canon, inv_canon, name, contact_org
def _contact_lp(cid, person_canon, org_canon, name, contact_org):
"""Best lp_id for a contact-anchored chunk: the firm if known, else the person."""
person = person_canon.get(cid)
firm = org_canon.get(contact_org.get(cid))
lp = firm or person
return lp, name.get(lp), person
def build_chunks(conn):
person_canon, org_canon, inv_canon, name, contact_org = _canon_maps(conn)
chunks = []
# communications
for r in conn.execute("""SELECT id, contact_id, type, subject, body, outcome, next_action, communication_date
FROM communications WHERE deleted_at IS NULL"""):
lp, lp_name, person = _contact_lp(r["contact_id"], person_canon, org_canon, name, contact_org)
parts = [p for p in (r["subject"], r["body"], r["outcome"], r["next_action"]) if (p or "").strip()]
chunks.append(_mk(f"communications:{r['id']}", lp, lp_name, person,
r["type"] or "note", to_epoch(r["communication_date"]),
"\n".join(parts), "communications", r["id"]))
# contacts.notes
for r in conn.execute("SELECT id, notes, updated_at FROM contacts WHERE notes IS NOT NULL AND notes <> '' AND deleted_at IS NULL"):
lp, lp_name, person = _contact_lp(r["id"], person_canon, org_canon, name, contact_org)
chunks.append(_mk(f"contacts.notes:{r['id']}", lp, lp_name, person,
"contact_note", to_epoch(r["updated_at"]), r["notes"], "contacts", r["id"]))
# opportunities (description + next_step)
for r in conn.execute("""SELECT id, contact_id, name, description, next_step, updated_at
FROM opportunities WHERE deleted_at IS NULL"""):
lp, lp_name, person = _contact_lp(r["contact_id"], person_canon, org_canon, name, contact_org)
parts = [p for p in (r["name"], r["description"], r["next_step"]) if (p or "").strip()]
chunks.append(_mk(f"opportunities:{r['id']}", lp, lp_name, person,
"opportunity", to_epoch(r["updated_at"]), "\n".join(parts), "opportunities", r["id"]))
# organizations.description
for r in conn.execute("""SELECT id, description, updated_at FROM organizations
WHERE description IS NOT NULL AND description <> '' AND deleted_at IS NULL"""):
lp = org_canon.get(r["id"])
chunks.append(_mk(f"organizations.description:{r['id']}", lp, name.get(lp), None,
"org_note", to_epoch(r["updated_at"]), r["description"], "organizations", r["id"]))
# fundraising_investors.notes — running outreach log, split per non-empty line
for r in conn.execute("""SELECT id, notes, updated_at FROM fundraising_investors
WHERE notes IS NOT NULL AND notes <> ''"""):
lp = inv_canon.get(r["id"])
for i, line in enumerate(str(r["notes"]).splitlines()):
if line.strip():
chunks.append(_mk(f"fundraising_investors.notes:{r['id']}:{i}", lp, name.get(lp), None,
"outreach_note", to_epoch(r["updated_at"]), line, "fundraising_investors", r["id"]))
# MATCHED emails (only matched rows carry a body; key lp via email_investor_links)
if _has_table(conn, "emails") and _has_table(conn, "email_investor_links"):
for r in conn.execute("""SELECT id, subject, body_text, snippet, sent_at FROM emails WHERE is_matched=1"""):
lp, lp_name = _email_lp(conn, r["id"], inv_canon, org_canon, person_canon, name)
text = "\n".join(p for p in (r["subject"], r["body_text"] or r["snippet"]) if (p or "").strip())
chunks.append(_mk(f"emails:{r['id']}", lp, lp_name, None, "email",
to_epoch(r["sent_at"]), text, "emails", r["id"]))
return [c for c in chunks if c]
def _has_table(conn, name):
return conn.execute("SELECT 1 FROM sqlite_master WHERE type='table' AND name=?", (name,)).fetchone() is not None
def _email_lp(conn, email_id, inv_canon, org_canon, person_canon, name):
"""Resolve a matched email's lp_id via email_investor_links, precedence:
fundraising_investor -> contact -> organization."""
row = conn.execute("""SELECT fundraising_investor_id, contact_id, organization_id
FROM email_investor_links WHERE email_id=? ORDER BY match_confidence DESC LIMIT 1""",
(email_id,)).fetchone()
if not row:
return None, None
lp = (inv_canon.get(row["fundraising_investor_id"]) or person_canon.get(row["contact_id"])
or org_canon.get(row["organization_id"]))
return lp, name.get(lp)
if __name__ == "__main__":
import argparse
from collections import Counter
from config import DEFAULT_DB
ap = argparse.ArgumentParser()
ap.add_argument("--db", default=DEFAULT_DB)
args = ap.parse_args()
conn = sqlite3.connect(args.db)
conn.row_factory = sqlite3.Row
chunks = build_chunks(conn)
print(f"{len(chunks)} chunks from {args.db}")
for dt, n in Counter(c["doc_type"] for c in chunks).most_common():
print(f" {dt:<16} {n}")
unresolved = sum(1 for c in chunks if not c["lp_id"])
print(f" (all chunks have an lp_id: {unresolved == 0})")
print("\nSample chunk:")
s = chunks[0]
print({k: (v[:80] + '' if k == 'text' and v and len(v) > 80 else v) for k, v in s.items()})