""" Threading resolution. Given a freshly-inserted emails row (or its about-to-be-inserted parsed dict), figure out which email_threads row it belongs to. If none exists, create one. Priority order (see architecture doc §10): 1. Existing email in our DB that shares any RFC Message-ID with this one's References/In-Reply-To chain — inherit its thread. 2. Existing thread with the same gmail_thread_id. 3. Existing thread with the same rfc_thread_root_id. 4. Create a new thread. """ import re import sqlite3 from typing import Optional from . import db as _db SUBJECT_PREFIX_RE = re.compile(r"^\s*(re|fwd?|aw|sv|antw|回复|fw)\s*:\s*", re.IGNORECASE) def normalize_subject(s: Optional[str]) -> Optional[str]: if not s: return None out = s # Strip up to 5 nested Re:/Fwd: prefixes. for _ in range(5): new = SUBJECT_PREFIX_RE.sub("", out, count=1) if new == out: break out = new return out.strip().lower() def resolve_thread_id(conn: sqlite3.Connection, parsed: dict) -> str: """Returns a thread_id — either an existing one or a newly created one.""" # Step 1: RFC cross-link. candidates = list(parsed.get("references") or []) if parsed.get("in_reply_to"): candidates.append(parsed["in_reply_to"]) if candidates: existing_email_id = _db.find_email_id_by_any_rfc_id(conn, candidates) if existing_email_id: cur = conn.cursor() cur.execute("SELECT thread_id FROM emails WHERE id = ?", (existing_email_id,)) row = cur.fetchone() if row and row["thread_id"]: return row["thread_id"] # Step 2: gmail_thread_id match. gt = parsed.get("gmail_thread_id") if gt: existing = _db.find_thread_by_gmail_id(conn, gt) if existing: return existing["id"] # Step 3: RFC thread-root match. rfc_root = parsed.get("rfc_thread_root_id") if rfc_root: existing = _db.find_thread_by_rfc_root(conn, rfc_root) if existing: return existing["id"] # Step 4: create. return _db.create_thread( conn, gmail_thread_id=gt, rfc_thread_root_id=rfc_root, subject_normalized=normalize_subject(parsed.get("subject")), first_message_at=parsed.get("sent_at"), )