""" AES-256-GCM encryption for OAuth refresh tokens at rest. Key material comes from CONFIG.secret_key_b64 (env: CRM_GMAIL_SECRET_KEY). Must be at least 32 bytes of entropy, base64-encoded. Storage format (as stored in BLOB columns): version(1 byte) || nonce(12 bytes) || ciphertext+tag(N bytes) version = 1 for AES-GCM-256. Uses the `cryptography` library. If not available (optional at scaffold time), the OAuth fallback path is disabled with a clear error — DWD path is unaffected. """ import base64 import os import secrets from typing import Optional try: from cryptography.hazmat.primitives.ciphers.aead import AESGCM # type: ignore _AVAILABLE = True except ImportError: # pragma: no cover AESGCM = None # type: ignore _AVAILABLE = False VERSION = 1 NONCE_LEN = 12 class CryptoUnavailable(RuntimeError): pass def _load_key(secret_key_b64: Optional[str]) -> bytes: if not secret_key_b64: raise CryptoUnavailable( "CRM_GMAIL_SECRET_KEY not set; cannot encrypt/decrypt OAuth tokens. " "DWD auth does not require this." ) try: key = base64.b64decode(secret_key_b64) except Exception as e: raise CryptoUnavailable(f"CRM_GMAIL_SECRET_KEY not valid base64: {e}") from e if len(key) < 32: raise CryptoUnavailable( f"CRM_GMAIL_SECRET_KEY decodes to {len(key)} bytes; need >= 32." ) return key[:32] # AES-256 def encrypt(plaintext: bytes, *, secret_key_b64: Optional[str]) -> bytes: if not _AVAILABLE: raise CryptoUnavailable("cryptography library not installed") key = _load_key(secret_key_b64) nonce = secrets.token_bytes(NONCE_LEN) ct = AESGCM(key).encrypt(nonce, plaintext, None) return bytes([VERSION]) + nonce + ct def decrypt(blob: bytes, *, secret_key_b64: Optional[str]) -> bytes: if not _AVAILABLE: raise CryptoUnavailable("cryptography library not installed") if not blob or len(blob) < 1 + NONCE_LEN + 16: raise ValueError("ciphertext too short") version = blob[0] if version != VERSION: raise ValueError(f"unsupported crypto version: {version}") nonce = blob[1:1 + NONCE_LEN] ct = blob[1 + NONCE_LEN:] key = _load_key(secret_key_b64) return AESGCM(key).decrypt(nonce, ct, None) def generate_secret_key_b64() -> str: """Helper for initial setup: prints a fresh key you can drop into env.""" return base64.b64encode(os.urandom(32)).decode("ascii")