""" Parse a Gmail `users.messages.get` response (format=full) into a flat dict ready for db.insert_email(). Input shape (abbreviated): { "id": "...", # Gmail message id "threadId": "...", "labelIds": ["INBOX","IMPORTANT",...], "snippet": "...", "historyId": "...", "internalDate": "1713657600000", # ms epoch, authoritative "sizeEstimate": 12345, "payload": { "headers": [{"name":"Subject","value":"..."}, ...], "mimeType": "multipart/mixed", "parts": [...recursive...], "body": {"data": "", "size": ...} } } """ import base64 import email.utils import email.header import re from datetime import datetime, timezone from typing import Any, Iterable, Optional from html.parser import HTMLParser # ---------------------------------------------------------------------------- public def parse(message: dict, *, owning_account_address: Optional[str] = None) -> dict: """Parse a Gmail message payload into our canonical dict shape.""" headers = _header_map(message.get("payload", {}).get("headers") or []) from_name, from_email = _split_addr(headers.get("from", "")) to_list = _parse_address_list(headers.get("to", "")) cc_list = _parse_address_list(headers.get("cc", "")) bcc_list = _parse_address_list(headers.get("bcc", "")) reply_to = _split_addr(headers.get("reply-to", ""))[1] or None sent_at = _parse_date_header(headers.get("date"), fallback_ms=message.get("internalDate")) rfc_mid = headers.get("message-id", "").strip() or f"synthetic-{message.get('id')}@ten31.local" rfc_mid = _strip_angle_brackets(rfc_mid) in_reply_to = _strip_angle_brackets(headers.get("in-reply-to", "").strip()) or None references = _split_references(headers.get("references", "")) rfc_thread_root_id = references[0] if references else (in_reply_to or rfc_mid) body_text, body_html, attachments = _walk_payload(message.get("payload", {})) subject = _decode_rfc2047(headers.get("subject") or "") labels = message.get("labelIds") or [] is_sent = "SENT" in labels return { "gmail_message_id": message.get("id"), "gmail_thread_id": message.get("threadId"), "rfc_message_id": rfc_mid, "rfc_thread_root_id": rfc_thread_root_id, "in_reply_to": in_reply_to, "references": references, "subject": subject, "from_email": (from_email or "").lower(), "from_name": from_name, "to": [{"email": e.lower(), "name": n} for n, e in to_list if e], "cc": [{"email": e.lower(), "name": n} for n, e in cc_list if e], "bcc": [{"email": e.lower(), "name": n} for n, e in bcc_list if e], "reply_to": reply_to.lower() if reply_to else None, "sent_at": sent_at, "body_text": _cap_text(body_text), "body_html": _cap_text(body_html), "snippet": message.get("snippet"), "attachments": attachments, "size_estimate": message.get("sizeEstimate"), "labels": labels, "is_sent": is_sent, "raw_headers": headers, "owning_account": owning_account_address, } # ---------------------------------------------------------------------------- headers def _header_map(header_list: Iterable[dict]) -> dict[str, str]: """Case-insensitive keys. Last-write-wins for duplicates (rare).""" out: dict[str, str] = {} for h in header_list: name = (h.get("name") or "").lower() out[name] = h.get("value") or "" return out def _decode_rfc2047(s: str) -> str: if not s: return "" try: parts = email.header.decode_header(s) pieces = [] for text, charset in parts: if isinstance(text, bytes): try: pieces.append(text.decode(charset or "utf-8", errors="replace")) except LookupError: pieces.append(text.decode("utf-8", errors="replace")) else: pieces.append(text) return "".join(pieces) except Exception: return s def _split_addr(raw: str) -> tuple[Optional[str], Optional[str]]: if not raw: return (None, None) name, addr = email.utils.parseaddr(raw) return (_decode_rfc2047(name) or None, addr or None) def _parse_address_list(raw: str) -> list[tuple[Optional[str], Optional[str]]]: if not raw: return [] parsed = email.utils.getaddresses([raw]) return [(_decode_rfc2047(n) or None, a or None) for n, a in parsed if a] def _parse_date_header(raw: Optional[str], *, fallback_ms: Optional[str]) -> str: # Prefer RFC Date header, fall back to Gmail internalDate (epoch ms). if raw: try: dt = email.utils.parsedate_to_datetime(raw) if dt is not None: if dt.tzinfo is None: dt = dt.replace(tzinfo=timezone.utc) return dt.astimezone(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ") except (TypeError, ValueError): pass if fallback_ms: try: dt = datetime.fromtimestamp(int(fallback_ms) / 1000.0, tz=timezone.utc) return dt.strftime("%Y-%m-%dT%H:%M:%SZ") except (TypeError, ValueError): pass return datetime.now(tz=timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ") def _split_references(raw: str) -> list[str]: if not raw: return [] return [_strip_angle_brackets(p) for p in raw.split() if p.strip()] def _strip_angle_brackets(s: str) -> str: s = (s or "").strip() if s.startswith("<") and s.endswith(">"): return s[1:-1] return s # ---------------------------------------------------------------------------- MIME walk def _walk_payload(payload: dict) -> tuple[Optional[str], Optional[str], list[dict]]: """Returns (body_text, body_html, attachments). Depth-first walk. First plain/text wins for body_text; first text/html wins for body_html. Anything with a filename or attachment disposition becomes an attachment entry. """ text: Optional[str] = None html_body: Optional[str] = None attachments: list[dict] = [] def visit(part: dict): nonlocal text, html_body mime = (part.get("mimeType") or "").lower() filename = part.get("filename") or "" body = part.get("body") or {} parts = part.get("parts") or [] headers = _header_map(part.get("headers") or []) disposition = (headers.get("content-disposition") or "").lower() is_attachment = bool(filename) or disposition.startswith("attachment") if is_attachment: attachments.append({ "filename": filename or f"unnamed.{_ext_for(mime)}", "mime_type": mime or "application/octet-stream", "size": body.get("size"), "gmail_attachment_id": body.get("attachmentId"), # Some tiny attachments come inlined as base64; attachmentId is # then missing and data is in body.data. sync.py handles both. "inline_data_b64": body.get("data"), "content_disposition": "inline" if disposition.startswith("inline") else "attachment", }) else: if mime == "text/plain" and text is None: text = _decode_body(body) elif mime == "text/html" and html_body is None: html_body = _decode_body(body) for child in parts: visit(child) visit(payload) # Derive a plain-text body from HTML if only HTML exists. if text is None and html_body: text = _strip_html(html_body) return text, html_body, attachments def _decode_body(body: dict) -> Optional[str]: data = body.get("data") if not data: return None try: padding = 4 - (len(data) % 4) if padding != 4: data = data + ("=" * padding) raw = base64.urlsafe_b64decode(data.encode("ascii")) return raw.decode("utf-8", errors="replace").replace("\r\n", "\n") except Exception: return None # ---------------------------------------------------------------------------- HTML stripping class _HTMLToText(HTMLParser): def __init__(self): super().__init__() self._parts: list[str] = [] self._skip_depth = 0 def handle_starttag(self, tag, attrs): if tag in ("script", "style"): self._skip_depth += 1 if tag in ("br", "p", "div", "tr", "li"): self._parts.append("\n") def handle_endtag(self, tag): if tag in ("script", "style"): self._skip_depth = max(0, self._skip_depth - 1) if tag in ("p", "div", "tr"): self._parts.append("\n") def handle_data(self, data): if self._skip_depth == 0: self._parts.append(data) def text(self) -> str: raw = "".join(self._parts) return re.sub(r"\n{3,}", "\n\n", raw).strip() def _strip_html(html: str) -> str: p = _HTMLToText() try: p.feed(html) return p.text() except Exception: return re.sub(r"<[^>]+>", " ", html) def _ext_for(mime: str) -> str: return mime.split("/")[-1] if "/" in mime else "bin" # ---------------------------------------------------------------------------- caps # Keep bodies bounded to avoid a pathological 500MB message exploding the DB. _BODY_CAP_BYTES = 10 * 1024 * 1024 # 10MB def _cap_text(s: Optional[str]) -> Optional[str]: if s is None: return None if len(s.encode("utf-8", errors="ignore")) <= _BODY_CAP_BYTES: return s return s[: _BODY_CAP_BYTES // 2] + "\n\n[TRUNCATED BY CRM — body exceeded 10MB]"