Files
Keysat c7ce44d963 Phase 0 foundation: canonical schema, ingest pipeline, CRM MCP server
Workstream A–C substrate for the Ten31 agentic system:
- A1: docs/crm-overview.md; CLAUDE.md conventions + guardrail #9
- A2: additive/reversible core migration (canonical_entities, entity_links,
  interaction_log, relationship_edges, soft-delete) + ledgered runner
- B1/B3: chunking + deterministic entity resolution (backend/ingest)
- B2: dense (bge-m3) + BM25 sparse ingest to Qdrant crm_chunks
- C: CRM MCP server (reads, retrieval modes, logged writes) — no outbound tools
- docs: redaction/re-hydration, Gmail enablement runbook
- synthetic test data; .env.example; housekeeping (.gitignore, untrack crm.db,
  drop legacy files + start9/0.3.5)

Verified end-to-end on synthetic data + live Sparks (hybrid > dense on entity
queries). Real backfill runs on Ten31 infra; index holds synthetic data only.
Branch snapshot also captures pre-existing working-tree changes.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-05 08:13:35 -05:00

76 lines
2.3 KiB
Python

"""
Threading resolution.
Given a freshly-inserted emails row (or its about-to-be-inserted parsed dict),
figure out which email_threads row it belongs to. If none exists, create one.
Priority order (see architecture doc §10):
1. Existing email in our DB that shares any RFC Message-ID with this one's
References/In-Reply-To chain — inherit its thread.
2. Existing thread with the same gmail_thread_id.
3. Existing thread with the same rfc_thread_root_id.
4. Create a new thread.
"""
import re
import sqlite3
from typing import Optional
from . import db as _db
SUBJECT_PREFIX_RE = re.compile(r"^\s*(re|fwd?|aw|sv|antw|回复|fw)\s*:\s*", re.IGNORECASE)
def normalize_subject(s: Optional[str]) -> Optional[str]:
if not s:
return None
out = s
# Strip up to 5 nested Re:/Fwd: prefixes.
for _ in range(5):
new = SUBJECT_PREFIX_RE.sub("", out, count=1)
if new == out:
break
out = new
return out.strip().lower()
def resolve_thread_id(conn: sqlite3.Connection, parsed: dict) -> str:
"""Returns a thread_id — either an existing one or a newly created one."""
# Step 1: RFC cross-link.
candidates = list(parsed.get("references") or [])
if parsed.get("in_reply_to"):
candidates.append(parsed["in_reply_to"])
if candidates:
existing_email_id = _db.find_email_id_by_any_rfc_id(conn, candidates)
if existing_email_id:
cur = conn.cursor()
cur.execute("SELECT thread_id FROM emails WHERE id = ?", (existing_email_id,))
row = cur.fetchone()
if row and row["thread_id"]:
return row["thread_id"]
# Step 2: gmail_thread_id match.
gt = parsed.get("gmail_thread_id")
if gt:
existing = _db.find_thread_by_gmail_id(conn, gt)
if existing:
return existing["id"]
# Step 3: RFC thread-root match.
rfc_root = parsed.get("rfc_thread_root_id")
if rfc_root:
existing = _db.find_thread_by_rfc_root(conn, rfc_root)
if existing:
return existing["id"]
# Step 4: create.
return _db.create_thread(
conn,
gmail_thread_id=gt,
rfc_thread_root_id=rfc_root,
subject_normalized=normalize_subject(parsed.get("subject")),
first_message_at=parsed.get("sent_at"),
)