c7ce44d963
Workstream A–C substrate for the Ten31 agentic system: - A1: docs/crm-overview.md; CLAUDE.md conventions + guardrail #9 - A2: additive/reversible core migration (canonical_entities, entity_links, interaction_log, relationship_edges, soft-delete) + ledgered runner - B1/B3: chunking + deterministic entity resolution (backend/ingest) - B2: dense (bge-m3) + BM25 sparse ingest to Qdrant crm_chunks - C: CRM MCP server (reads, retrieval modes, logged writes) — no outbound tools - docs: redaction/re-hydration, Gmail enablement runbook - synthetic test data; .env.example; housekeeping (.gitignore, untrack crm.db, drop legacy files + start9/0.3.5) Verified end-to-end on synthetic data + live Sparks (hybrid > dense on entity queries). Real backfill runs on Ten31 infra; index holds synthetic data only. Branch snapshot also captures pre-existing working-tree changes. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
235 lines
7.9 KiB
Python
235 lines
7.9 KiB
Python
"""
|
|
Attachment download + on-disk storage.
|
|
|
|
Two usage patterns:
|
|
|
|
1. During message parsing we call `register_stubs(conn, email_id, parsed)`
|
|
which writes pending rows to email_attachments.
|
|
|
|
2. A separate worker (kicked off by sync after each account completes)
|
|
calls `drain_pending()` which fetches attachment bytes from Gmail and
|
|
writes them to disk under CONFIG.attachments_dir.
|
|
|
|
Files are named: <CRM_DATA_DIR>/email_attachments/<email_id[:2]>/<email_id>/<attachment_id>-<sanitized_filename>
|
|
|
|
Sanitization prevents path traversal and keeps cross-platform-safe names.
|
|
"""
|
|
|
|
import base64
|
|
import hashlib
|
|
import os
|
|
import re
|
|
import sqlite3
|
|
from typing import Iterable, Optional
|
|
|
|
from . import config as _cfg
|
|
from . import db as _db
|
|
from . import errors as _errors
|
|
from . import gmail_client as _gmail
|
|
|
|
|
|
_MAX_FILENAME_LEN = 200
|
|
_BAD_FILENAME_CHARS = re.compile(r'[/\\\x00-\x1f\x7f:*?"<>|]+')
|
|
|
|
|
|
def _sanitize_filename(name: str) -> str:
|
|
if not name:
|
|
return "unnamed.bin"
|
|
# strip path components first
|
|
name = os.path.basename(name.replace("\\", "/"))
|
|
name = _BAD_FILENAME_CHARS.sub("_", name).strip(" .")
|
|
if not name:
|
|
name = "unnamed.bin"
|
|
if len(name) > _MAX_FILENAME_LEN:
|
|
stem, dot, ext = name.rpartition(".")
|
|
if dot:
|
|
name = stem[: _MAX_FILENAME_LEN - len(ext) - 1] + "." + ext
|
|
else:
|
|
name = name[:_MAX_FILENAME_LEN]
|
|
return name
|
|
|
|
|
|
def _storage_path_for(email_id: str, attachment_id: str, sanitized_filename: str) -> str:
|
|
root = _cfg.CONFIG.attachments_dir
|
|
bucket = email_id[:2] or "_0"
|
|
dir_ = os.path.join(root, bucket, email_id)
|
|
os.makedirs(dir_, exist_ok=True)
|
|
return os.path.join(dir_, f"{attachment_id}-{sanitized_filename}")
|
|
|
|
|
|
# ---------------------------------------------------------------------------- phase 1: register stubs
|
|
|
|
def register_stubs(conn: sqlite3.Connection, *, email_id: str,
|
|
parsed_attachments: Iterable[dict]) -> list[str]:
|
|
"""Write pending attachment rows from parsed message data.
|
|
|
|
Also handles tiny inline attachments whose bytes arrived with the message
|
|
body (body.data present, no separate attachmentId) by writing them
|
|
directly and marking as downloaded.
|
|
|
|
Returns list of attachment ids created.
|
|
"""
|
|
max_bytes = _cfg.CONFIG.max_attachment_mb * 1024 * 1024
|
|
ids = []
|
|
|
|
for att in parsed_attachments:
|
|
filename = att.get("filename") or "unnamed.bin"
|
|
sanitized = _sanitize_filename(filename)
|
|
gmail_att_id = att.get("gmail_attachment_id") or ""
|
|
mime = att.get("mime_type")
|
|
size = att.get("size")
|
|
|
|
# Determine storage path (we write the path whether or not the download
|
|
# succeeded; missing files surface via download_status).
|
|
att_row_id = _db.insert_attachment_stub(
|
|
conn,
|
|
email_id=email_id,
|
|
gmail_attachment_id=gmail_att_id,
|
|
filename=filename,
|
|
sanitized_filename=sanitized,
|
|
mime_type=mime,
|
|
size_bytes=size,
|
|
storage_path=_storage_path_for(email_id, gmail_att_id or att_row_id_fallback(), sanitized),
|
|
)
|
|
ids.append(att_row_id)
|
|
|
|
# Oversize guard.
|
|
if isinstance(size, int) and size > max_bytes:
|
|
conn.execute(
|
|
"UPDATE email_attachments SET download_status = 'skipped', "
|
|
"download_error = ? WHERE id = ?",
|
|
(f"exceeds max size {_cfg.CONFIG.max_attachment_mb}MB", att_row_id),
|
|
)
|
|
continue
|
|
|
|
# Inline data fast-path.
|
|
inline_b64 = att.get("inline_data_b64")
|
|
if inline_b64:
|
|
try:
|
|
raw = base64.urlsafe_b64decode(_pad(inline_b64).encode("ascii"))
|
|
path = _storage_path_for(email_id, att_row_id, sanitized)
|
|
_write_bytes(path, raw)
|
|
sha = hashlib.sha256(raw).hexdigest()
|
|
conn.execute(
|
|
"UPDATE email_attachments SET storage_path = ? WHERE id = ?",
|
|
(path, att_row_id),
|
|
)
|
|
_db.mark_attachment_downloaded(
|
|
conn, att_row_id, sha256_hex=sha, size_bytes=len(raw)
|
|
)
|
|
except Exception as e:
|
|
_db.mark_attachment_failed(conn, att_row_id, error=f"inline decode: {e}")
|
|
|
|
return ids
|
|
|
|
|
|
def att_row_id_fallback() -> str:
|
|
# Placeholder so the path template always produces something if gmail_att_id
|
|
# was missing at stub time; the real path is rewritten when the worker
|
|
# picks it up.
|
|
import uuid
|
|
return uuid.uuid4().hex
|
|
|
|
|
|
# ---------------------------------------------------------------------------- phase 2: worker
|
|
|
|
def drain_pending(conn_factory, client: _gmail.GmailClient, account_id: str,
|
|
*, limit: int = 50) -> int:
|
|
"""Download up to `limit` pending attachments for `account_id`.
|
|
|
|
Returns count of successfully downloaded attachments. Called after each
|
|
account's sync completes so large files don't block the sync loop.
|
|
"""
|
|
conn = conn_factory()
|
|
try:
|
|
pending = _db.pending_attachments(conn, limit=limit)
|
|
finally:
|
|
conn.close()
|
|
|
|
downloaded = 0
|
|
for row in pending:
|
|
if row["account_id"] != account_id:
|
|
continue
|
|
conn = conn_factory()
|
|
try:
|
|
ok = _download_one(conn, client, row)
|
|
if ok:
|
|
downloaded += 1
|
|
conn.commit()
|
|
finally:
|
|
conn.close()
|
|
return downloaded
|
|
|
|
|
|
def _download_one(conn: sqlite3.Connection, client: _gmail.GmailClient, row) -> bool:
|
|
try:
|
|
resp = client.get_attachment(row["gmail_message_id"], row["gmail_attachment_id"])
|
|
except _errors.RETRYABLE as e:
|
|
_db.mark_attachment_failed(conn, row["id"], error=f"transient: {type(e).__name__}")
|
|
return False
|
|
except _errors.GmailError as e:
|
|
_db.mark_attachment_failed(conn, row["id"], error=f"{type(e).__name__}: {e}")
|
|
return False
|
|
|
|
data_b64 = resp.get("data")
|
|
if not data_b64:
|
|
_db.mark_attachment_failed(conn, row["id"], error="empty data in response")
|
|
return False
|
|
|
|
try:
|
|
raw = base64.urlsafe_b64decode(_pad(data_b64).encode("ascii"))
|
|
except Exception as e:
|
|
_db.mark_attachment_failed(conn, row["id"], error=f"decode: {e}")
|
|
return False
|
|
|
|
sha = hashlib.sha256(raw).hexdigest()
|
|
# If an existing attachment has the same SHA, re-point storage_path and skip write.
|
|
existing = _find_existing_by_sha(conn, sha, exclude_id=row["id"])
|
|
if existing:
|
|
conn.execute(
|
|
"UPDATE email_attachments SET storage_path = ? WHERE id = ?",
|
|
(existing["storage_path"], row["id"]),
|
|
)
|
|
_db.mark_attachment_downloaded(conn, row["id"], sha256_hex=sha, size_bytes=len(raw))
|
|
return True
|
|
|
|
path = _storage_path_for(row["email_id"], row["id"], row["sanitized_filename"])
|
|
try:
|
|
_write_bytes(path, raw)
|
|
except OSError as e:
|
|
_db.mark_attachment_failed(conn, row["id"], error=f"disk: {e}")
|
|
return False
|
|
|
|
conn.execute(
|
|
"UPDATE email_attachments SET storage_path = ? WHERE id = ?",
|
|
(path, row["id"]),
|
|
)
|
|
_db.mark_attachment_downloaded(conn, row["id"], sha256_hex=sha, size_bytes=len(raw))
|
|
return True
|
|
|
|
|
|
def _find_existing_by_sha(conn: sqlite3.Connection, sha: str, *, exclude_id: str) -> Optional[sqlite3.Row]:
|
|
cur = conn.cursor()
|
|
cur.execute(
|
|
"SELECT * FROM email_attachments WHERE sha256_hex = ? AND id != ? "
|
|
"AND download_status = 'downloaded' LIMIT 1",
|
|
(sha, exclude_id),
|
|
)
|
|
return cur.fetchone()
|
|
|
|
|
|
# ---------------------------------------------------------------------------- utils
|
|
|
|
def _pad(b64: str) -> str:
|
|
pad = 4 - (len(b64) % 4)
|
|
return b64 + ("=" * pad if pad != 4 else "")
|
|
|
|
|
|
def _write_bytes(path: str, data: bytes) -> None:
|
|
os.makedirs(os.path.dirname(path), exist_ok=True)
|
|
tmp = path + ".tmp"
|
|
with open(tmp, "wb") as f:
|
|
f.write(data)
|
|
os.chmod(tmp, 0o600)
|
|
os.replace(tmp, path)
|