""" Attachment download + on-disk storage. Two usage patterns: 1. During message parsing we call `register_stubs(conn, email_id, parsed)` which writes pending rows to email_attachments. 2. A separate worker (kicked off by sync after each account completes) calls `drain_pending()` which fetches attachment bytes from Gmail and writes them to disk under CONFIG.attachments_dir. Files are named: /email_attachments///- Sanitization prevents path traversal and keeps cross-platform-safe names. """ import base64 import hashlib import os import re import sqlite3 from typing import Iterable, Optional from . import config as _cfg from . import db as _db from . import errors as _errors from . import gmail_client as _gmail _MAX_FILENAME_LEN = 200 _BAD_FILENAME_CHARS = re.compile(r'[/\\\x00-\x1f\x7f:*?"<>|]+') def _sanitize_filename(name: str) -> str: if not name: return "unnamed.bin" # strip path components first name = os.path.basename(name.replace("\\", "/")) name = _BAD_FILENAME_CHARS.sub("_", name).strip(" .") if not name: name = "unnamed.bin" if len(name) > _MAX_FILENAME_LEN: stem, dot, ext = name.rpartition(".") if dot: name = stem[: _MAX_FILENAME_LEN - len(ext) - 1] + "." + ext else: name = name[:_MAX_FILENAME_LEN] return name def _storage_path_for(email_id: str, attachment_id: str, sanitized_filename: str) -> str: root = _cfg.CONFIG.attachments_dir bucket = email_id[:2] or "_0" dir_ = os.path.join(root, bucket, email_id) os.makedirs(dir_, exist_ok=True) return os.path.join(dir_, f"{attachment_id}-{sanitized_filename}") # ---------------------------------------------------------------------------- phase 1: register stubs def register_stubs(conn: sqlite3.Connection, *, email_id: str, parsed_attachments: Iterable[dict]) -> list[str]: """Write pending attachment rows from parsed message data. Also handles tiny inline attachments whose bytes arrived with the message body (body.data present, no separate attachmentId) by writing them directly and marking as downloaded. Returns list of attachment ids created. """ max_bytes = _cfg.CONFIG.max_attachment_mb * 1024 * 1024 ids = [] for att in parsed_attachments: filename = att.get("filename") or "unnamed.bin" sanitized = _sanitize_filename(filename) gmail_att_id = att.get("gmail_attachment_id") or "" mime = att.get("mime_type") size = att.get("size") # Determine storage path (we write the path whether or not the download # succeeded; missing files surface via download_status). att_row_id = _db.insert_attachment_stub( conn, email_id=email_id, gmail_attachment_id=gmail_att_id, filename=filename, sanitized_filename=sanitized, mime_type=mime, size_bytes=size, storage_path=_storage_path_for(email_id, gmail_att_id or att_row_id_fallback(), sanitized), ) ids.append(att_row_id) # Oversize guard. if isinstance(size, int) and size > max_bytes: conn.execute( "UPDATE email_attachments SET download_status = 'skipped', " "download_error = ? WHERE id = ?", (f"exceeds max size {_cfg.CONFIG.max_attachment_mb}MB", att_row_id), ) continue # Inline data fast-path. inline_b64 = att.get("inline_data_b64") if inline_b64: try: raw = base64.urlsafe_b64decode(_pad(inline_b64).encode("ascii")) path = _storage_path_for(email_id, att_row_id, sanitized) _write_bytes(path, raw) sha = hashlib.sha256(raw).hexdigest() conn.execute( "UPDATE email_attachments SET storage_path = ? WHERE id = ?", (path, att_row_id), ) _db.mark_attachment_downloaded( conn, att_row_id, sha256_hex=sha, size_bytes=len(raw) ) except Exception as e: _db.mark_attachment_failed(conn, att_row_id, error=f"inline decode: {e}") return ids def att_row_id_fallback() -> str: # Placeholder so the path template always produces something if gmail_att_id # was missing at stub time; the real path is rewritten when the worker # picks it up. import uuid return uuid.uuid4().hex # ---------------------------------------------------------------------------- phase 2: worker def drain_pending(conn_factory, client: _gmail.GmailClient, account_id: str, *, limit: int = 50) -> int: """Download up to `limit` pending attachments for `account_id`. Returns count of successfully downloaded attachments. Called after each account's sync completes so large files don't block the sync loop. """ conn = conn_factory() try: pending = _db.pending_attachments(conn, limit=limit) finally: conn.close() downloaded = 0 for row in pending: if row["account_id"] != account_id: continue conn = conn_factory() try: ok = _download_one(conn, client, row) if ok: downloaded += 1 conn.commit() finally: conn.close() return downloaded def _download_one(conn: sqlite3.Connection, client: _gmail.GmailClient, row) -> bool: try: resp = client.get_attachment(row["gmail_message_id"], row["gmail_attachment_id"]) except _errors.RETRYABLE as e: _db.mark_attachment_failed(conn, row["id"], error=f"transient: {type(e).__name__}") return False except _errors.GmailError as e: _db.mark_attachment_failed(conn, row["id"], error=f"{type(e).__name__}: {e}") return False data_b64 = resp.get("data") if not data_b64: _db.mark_attachment_failed(conn, row["id"], error="empty data in response") return False try: raw = base64.urlsafe_b64decode(_pad(data_b64).encode("ascii")) except Exception as e: _db.mark_attachment_failed(conn, row["id"], error=f"decode: {e}") return False sha = hashlib.sha256(raw).hexdigest() # If an existing attachment has the same SHA, re-point storage_path and skip write. existing = _find_existing_by_sha(conn, sha, exclude_id=row["id"]) if existing: conn.execute( "UPDATE email_attachments SET storage_path = ? WHERE id = ?", (existing["storage_path"], row["id"]), ) _db.mark_attachment_downloaded(conn, row["id"], sha256_hex=sha, size_bytes=len(raw)) return True path = _storage_path_for(row["email_id"], row["id"], row["sanitized_filename"]) try: _write_bytes(path, raw) except OSError as e: _db.mark_attachment_failed(conn, row["id"], error=f"disk: {e}") return False conn.execute( "UPDATE email_attachments SET storage_path = ? WHERE id = ?", (path, row["id"]), ) _db.mark_attachment_downloaded(conn, row["id"], sha256_hex=sha, size_bytes=len(raw)) return True def _find_existing_by_sha(conn: sqlite3.Connection, sha: str, *, exclude_id: str) -> Optional[sqlite3.Row]: cur = conn.cursor() cur.execute( "SELECT * FROM email_attachments WHERE sha256_hex = ? AND id != ? " "AND download_status = 'downloaded' LIMIT 1", (sha, exclude_id), ) return cur.fetchone() # ---------------------------------------------------------------------------- utils def _pad(b64: str) -> str: pad = 4 - (len(b64) % 4) return b64 + ("=" * pad if pad != 4 else "") def _write_bytes(path: str, data: bytes) -> None: os.makedirs(os.path.dirname(path), exist_ok=True) tmp = path + ".tmp" with open(tmp, "wb") as f: f.write(data) os.chmod(tmp, 0o600) os.replace(tmp, path)