"""Redaction gateway — `POST /scrub` + `POST /rehydrate`. The privacy boundary between sovereign LP data and the Claude API. An agent sends its assembled LP-specific context to `/scrub`; we de-identify it (the real values never leave this box) and return placeholder-only text the agent forwards to Claude. Claude reasons over `[PERSON_1] introduced [PERSON_2] to [FUND_1]` and replies in the same placeholders; the agent sends Claude's reply to `/rehydrate`, which swaps the real values back in for human review. Design: * Detection logic is the VENDORED reference engine (app/redaction/scrub.py), never reimplemented — parity is by construction (its leak test must pass). * The pseudonym map {token -> real_value} is the de-anonymization key. It is the ONE place real values live; held server-side keyed by an opaque map_handle in a TTL-swept local store on /data (0700 dir / 0600 file — never world-readable), NEVER returned in full, NEVER logged, NEVER in a Claude-bound payload. * The caller-supplied `known_entities` dictionary is itself a slice of the LP list — treated as sensitive: used transiently for the scrub, never persisted beyond the resulting tokens, never logged or echoed. * The local-Qwen NER backstop is LOAD-BEARING, not optional, and FAILS CLOSED: if Qwen is unreachable / returns a malformed or empty-schema result under ner=auto/qwen, /scrub returns 422 and emits nothing rather than passing name-blind text to Claude. Descriptive re-identifiers it flags are redacted, and if a substantial flagged span cannot be located+removed from the final text we ALSO fail closed (no identifier-blind prose reaches Claude). This gateway does NOT call Claude. It is the scrub/rehydrate transform pair plus the server-held map. """ from __future__ import annotations import asyncio import json import logging import os import re import sqlite3 import time import uuid from datetime import datetime, timezone from typing import Any, Optional import httpx from fastapi import APIRouter from fastapi.responses import JSONResponse from pydantic import BaseModel from .config import Settings from .redaction import scrub as engine # vendored parity-locked engine logger = logging.getLogger("spark-control.redaction") DEFAULT_TTL_SECONDS = 7200 # 2h — spans a human-review round-trip QWEN_NER_TIMEOUT = 60.0 QWEN_NER_MAX_CHARS = 24000 # guard the NER prompt size per item # A descriptive re-identifier span is "substantial" (and so must be removable, or # we fail closed) when it's a real phrase, not model noise like "the founder". DESCRIPTIVE_MIN_WORDS = 4 DESCRIPTIVE_MIN_CHARS = 25 # ────────────────────────── typed control-flow errors ────────────────────────── class NerUnavailable(RuntimeError): """Raised from the NER pass for ANY unreachable/malformed/empty-schema result, so the endpoint can fail closed (422) without brittle string matching.""" class _Contract(Exception): """A documented gateway error. Carries the exact top-level body shape the handover contract specifies (e.g. {"error":"tier1_detected","spans":[...]}), returned via JSONResponse so keys sit at top level (NOT wrapped under FastAPI's "detail").""" def __init__(self, status: int, body: dict) -> None: self.status = status self.body = body # ────────────────────────── server-held pseudonym map store ────────────────────────── class MapStore: """TTL-swept local store for pseudonym maps, keyed by map_handle. Stored on the /data volume so an in-flight task survives a container restart. Holds ONLY the {token -> real_value} map (the de-anon key) — never the raw caller dictionary, never any Claude-bound text. The db + its WAL/journal/shm sidecars are created 0600 under a 0700 dir, so no other local user/process can read the real values. Rows TTL-expired. """ def __init__(self, db_path: str, ttl_seconds: int = DEFAULT_TTL_SECONDS) -> None: self.db_path = db_path self.ttl_seconds = ttl_seconds d = os.path.dirname(db_path) or "." try: os.makedirs(d, mode=0o700, exist_ok=True) os.chmod(d, 0o700) except Exception as e: logger.warning("could not tighten map dir perms on %s: %s", d, e) # Create the db (and sidecars) under a tight umask so they're 0600. old_umask = os.umask(0o077) try: self._init_db() for suffix in ("", "-wal", "-shm", "-journal"): p = db_path + suffix if os.path.exists(p): try: os.chmod(p, 0o600) except Exception: pass finally: os.umask(old_umask) def _conn(self) -> sqlite3.Connection: c = sqlite3.connect(self.db_path) c.row_factory = sqlite3.Row return c def _init_db(self) -> None: with self._conn() as c: c.execute( """CREATE TABLE IF NOT EXISTS pseudonym_maps ( map_handle TEXT PRIMARY KEY, task_id TEXT NOT NULL, token_map TEXT NOT NULL, created_at REAL NOT NULL, expires_at REAL NOT NULL )""" ) def _sweep(self, c: sqlite3.Connection) -> None: c.execute("DELETE FROM pseudonym_maps WHERE expires_at < ?", (time.time(),)) def create(self, task_id: str, token_map: dict) -> tuple[str, float]: handle = uuid.uuid4().hex now = time.time() expires = now + self.ttl_seconds with self._conn() as c: self._sweep(c) c.execute( "INSERT INTO pseudonym_maps (map_handle, task_id, token_map, created_at, expires_at) VALUES (?,?,?,?,?)", (handle, task_id, json.dumps(token_map), now, expires), ) return handle, expires def extend(self, map_handle: str, token_map: dict) -> float: now = time.time() expires = now + self.ttl_seconds with self._conn() as c: self._sweep(c) cur = c.execute( "UPDATE pseudonym_maps SET token_map=?, expires_at=? WHERE map_handle=? AND expires_at>=?", (json.dumps(token_map), expires, map_handle, now), ) if cur.rowcount == 0: raise KeyError("map_handle not found or expired") return expires def get(self, map_handle: str) -> Optional[dict]: """Return the token_map, None if unknown, or raises _Expired if TTL lapsed.""" with self._conn() as c: row = c.execute( "SELECT token_map, expires_at FROM pseudonym_maps WHERE map_handle=?", (map_handle,), ).fetchone() if row is None: return None if row["expires_at"] < time.time(): raise _Expired() return json.loads(row["token_map"]) class _Expired(Exception): pass def _state_from_map(token_map: dict) -> engine.ScrubState: """Reconstruct a ScrubState from a stored token_map so a reused map_handle keeps token assignment stable (same surface -> same token) and continues numbering for new entities. Does not modify the vendored engine.""" st = engine.ScrubState() st.token_map = dict(token_map) for tok, surface in token_map.items(): m = re.match(r"\[([A-Z]+)_(\d+)\]", tok) if not m: continue ttype, n = m.group(1), int(m.group(2)) st._by_value[(ttype, surface)] = tok if ttype in st._counters: st._counters[ttype] = max(st._counters[ttype], n) return st # ────────────────────────── local-Qwen NER backstop ────────────────────────── _NER_SYSTEM = ( "You are a PII extraction engine inside a privacy redaction gateway. You receive text " "in which known names and structured identifiers may ALREADY be replaced by placeholder " "tokens shaped like [PERSON_1] or [AMOUNT_2]. Your job is to find what is NOT yet redacted. " "Return ONLY a single JSON object, no prose, no code fence. Schema:\n" '{"entities":[{"text":"","type":"PERSON|ORG|FUND|LOC"}],' '"descriptive":[{"span":"\"}]}\n" "Rules: include real person names, company/org names, fund names, and place names that are " "NOT already a [TOKEN]. NEVER include any [TYPE_N] placeholder. 'text' and 'span' must be " "exact substrings copied from the input. If nothing is found, return both arrays empty." ) def _strip_think(s: str) -> str: """Remove any ... block so its braces can't confuse JSON extraction.""" return re.sub(r".*?", "", s, flags=re.DOTALL | re.IGNORECASE).strip() def _parse_ner_json(content: str) -> Any: s = _strip_think(content).strip() if s.startswith("```"): s = re.sub(r"^```[a-zA-Z]*\n?", "", s) s = re.sub(r"\n?```$", "", s).strip() try: return json.loads(s) except Exception: a, b = s.find("{"), s.rfind("}") if a != -1 and b != -1 and b > a: return json.loads(s[a : b + 1]) raise class QwenNER: """Synchronous NER caller (scrub() invokes ner_fn synchronously, so the whole scrub runs in a threadpool and this uses a sync HTTP client). Fails CLOSED: any unreachable/malformed/empty-schema/truncated result raises NerUnavailable, so the endpoint returns 422 rather than emitting name-blind text.""" def __init__(self, base_url: str, model_id: str) -> None: self.base_url = base_url self.model_id = model_id self.descriptive: list[str] = [] def _call(self, text: str) -> dict: body = { "model": self.model_id, "messages": [ {"role": "system", "content": _NER_SYSTEM}, {"role": "user", "content": text[:QWEN_NER_MAX_CHARS]}, ], "temperature": 0, "max_tokens": 2048, "response_format": {"type": "json_object"}, "chat_template_kwargs": {"enable_thinking": False}, } try: with httpx.Client(timeout=QWEN_NER_TIMEOUT) as c: r = c.post(f"{self.base_url}/v1/chat/completions", json=body) except Exception as e: raise NerUnavailable(f"local Qwen NER unreachable: {e}") if r.status_code != 200: raise NerUnavailable(f"local Qwen NER HTTP {r.status_code}") try: choice = r.json()["choices"][0] if choice.get("finish_reason") == "length": # Truncated NER output is unreliable -> fail closed. raise NerUnavailable("local Qwen NER output truncated (finish_reason=length)") data = _parse_ner_json(choice["message"]["content"]) except NerUnavailable: raise except Exception as e: raise NerUnavailable(f"local Qwen NER unparseable: {e}") # Schema validation: json_object guarantees valid JSON, not a populated # schema. An empty {} or a missing/!list field is a fail-OPEN trap -> fail closed. if (not isinstance(data, dict) or not isinstance(data.get("entities"), list) or not isinstance(data.get("descriptive"), list)): raise NerUnavailable("local Qwen NER returned a malformed/empty schema") return data def ner_fn(self, text: str): """text -> [(surface, type)] for the engine to tokenize. Side-effect: stashes descriptive re-identifier spans for the gateway to redact post-scrub.""" data = self._call(text) for d in data.get("descriptive", []) or []: span = (d.get("span") or "").strip() if isinstance(d, dict) else str(d).strip() if span and not engine._TOKEN_RE.search(span): self.descriptive.append(span) out = [] for e in data.get("entities", []) or []: if not isinstance(e, dict): continue t = (e.get("text") or "").strip() ty = (e.get("type") or "").strip().upper() if t and not engine._TOKEN_RE.search(t): out.append((t, ty if ty in engine.TOKEN_TYPES else "PERSON")) return out def _apply_tokenmap_to_span(span: str, token_map: dict) -> str: """Rewrite real values inside a descriptive span into their tokens, longest value first, so a span the NER returned BEFORE its embedded names were tokenized still matches the final scrubbed text (the P0 fail-open fix).""" s = span for tok in sorted(token_map, key=lambda t: len(token_map.get(t, "")), reverse=True): val = token_map[tok] if val: s = s.replace(val, tok) return s def _redact_descriptive(scrubbed: str, spans: list[str], token_map: dict, item_id: str): """Remove descriptive re-identifier spans from the final scrubbed text. For a SUBSTANTIAL span that cannot be located+removed (even after applying the token map), FAIL CLOSED (422) — never let identifier-blind prose reach Claude. Short/ generic model-noise spans are flagged but not blanket-removed (avoid over-redaction).""" flags: list[dict] = [] for span in sorted(set(spans), key=len, reverse=True): span = (span or "").strip() if not span: continue substantial = (len(span.split()) >= DESCRIPTIVE_MIN_WORDS) or (len(span) >= DESCRIPTIVE_MIN_CHARS) removed = False for variant in (span, _apply_tokenmap_to_span(span, token_map)): if variant and variant in scrubbed: scrubbed = scrubbed.replace(variant, "[redacted]") flags.append({"item": item_id, "span": span, "action": "redacted"}) removed = True break if not removed: if substantial: raise _Contract(422, {"error": "descriptive_unredactable", "item": item_id}) flags.append({"item": item_id, "span": span, "action": "skipped_generic"}) return scrubbed, flags async def _current_model_id(base_url: str) -> Optional[str]: try: async with httpx.AsyncClient(timeout=5.0) as c: r = await c.get(f"{base_url}/v1/models") if r.status_code == 200: data = r.json().get("data") or [] return data[0]["id"] if data else None except Exception: return None return None # ────────────────────────── request / response models ────────────────────────── class ScrubItem(BaseModel): id: str text: str class KnownEntities(BaseModel): persons: list[str] = [] orgs: list[str] = [] funds: list[str] = [] emails: list[str] = [] locations: list[str] = [] class BucketSpec(BaseModel): amounts: bool = False dates: bool = False class ScrubBody(BaseModel): task_id: str actor: Optional[str] = None items: list[ScrubItem] known_entities: Optional[KnownEntities] = None tier1_action: str = "drop" bucket: BucketSpec = BucketSpec() ner: str = "auto" map_handle: Optional[str] = None class RehydrateItem(BaseModel): id: str text: str class RehydrateBody(BaseModel): task_id: str map_handle: str items: list[RehydrateItem] actor: Optional[str] = None strict: bool = True def _bare(tokens: list[str]) -> list[str]: """[PERSON_1] -> PERSON_1 for the tokens_used field (matches the handover contract).""" return [t.strip("[]") for t in tokens] # ────────────────────────── router ────────────────────────── def build_router(settings: Settings, map_store: MapStore) -> APIRouter: router = APIRouter() def _qwen_base() -> str: return f"http://{settings.spark1_host}:{settings.vllm_port}" async def _do_scrub(body: ScrubBody): if not body.items: raise _Contract(400, {"error": "bad_request", "detail": "items is required"}) if body.tier1_action not in ("drop", "reject"): raise _Contract(400, {"error": "bad_request", "detail": "tier1_action must be 'drop' or 'reject'"}) if body.ner not in ("auto", "rules_only", "qwen"): raise _Contract(400, {"error": "bad_request", "detail": "ner must be 'auto', 'rules_only', or 'qwen'"}) # Caller dictionary -> engine shape. Sensitive: transient, never logged/echoed. known = None if body.known_entities: ke = body.known_entities known = {"persons": ke.persons, "orgs": ke.orgs, "funds": ke.funds, "emails": ke.emails, "locations": ke.locations} # NER backstop wiring (load-bearing under auto/qwen; fail-closed if unreachable). ner_enabled = body.ner in ("auto", "qwen") model_id: Optional[str] = None if ner_enabled: model_id = await _current_model_id(_qwen_base()) if not model_id: raise _Contract(422, { "error": "ner_unavailable", "detail": "local Qwen NER is required (ner=%s) but no model is loaded; load a model " "or call with ner='rules_only' to knowingly skip the NER backstop" % body.ner, }) # Reuse/extend an existing task map for stable cross-call tokens, else fresh. if body.map_handle: try: existing = map_store.get(body.map_handle) except _Expired: raise _Contract(410, {"error": "map_expired"}) if existing is None: raise _Contract(400, {"error": "unknown_map_handle"}) state = _state_from_map(existing) else: state = engine.ScrubState() out_items: list[dict] = [] descriptive_flags: list[dict] = [] tier1_total = 0 bucket_on = bool(body.bucket.amounts or body.bucket.dates) def _run_one(text: str, ner_obj: Optional[QwenNER]): ner_fn = ner_obj.ner_fn if ner_obj is not None else None return engine.scrub(text, known_entities=known, bucket=bucket_on, state=state, ner_fn=ner_fn) for item in body.items: item_ner = QwenNER(_qwen_base(), model_id) if (ner_enabled and model_id) else None tier1_before = len(state.tier1_dropped) try: scrubbed, _full_map, audit = await asyncio.to_thread(_run_one, item.text, item_ner) except NerUnavailable as e: raise _Contract(422, {"error": "ner_unavailable", "detail": str(e)[:300]}) except _Contract: raise except Exception: logger.exception("scrub failed for item %s", item.id) # Generic message only — never interpolate engine exception text. raise _Contract(500, {"error": "scrub_failed"}) # Per-item Tier-1 delta (state.tier1_dropped accumulates across items). item_tier1_kinds = state.tier1_dropped[tier1_before:] if body.tier1_action == "reject" and item_tier1_kinds: # KINDS + item id only — never the raw Tier-1 values. raise _Contract(422, { "error": "tier1_detected", "spans": [{"item": item.id, "kinds": sorted(set(item_tier1_kinds))}], }) tier1_total += len(item_tier1_kinds) # Redact descriptive re-identifiers (fail-closed on a substantial miss). if item_ner is not None and item_ner.descriptive: scrubbed, flags = _redact_descriptive( scrubbed, item_ner.descriptive, state.token_map, item.id) descriptive_flags.extend(flags) out_items.append({ "id": item.id, "scrubbed_text": scrubbed, "tokens_used": _bare(engine.residual_tokens(scrubbed)), }) # Persist/refresh the resulting token map (the de-anon key) under a handle. token_map = dict(state.token_map) if body.map_handle: try: expires = map_store.extend(body.map_handle, token_map) except KeyError: raise _Contract(410, {"error": "map_expired"}) handle = body.map_handle else: handle, expires = map_store.create(body.task_id, token_map) # tier2_tokenized = total placeholder OCCURRENCES across items; # distinct_entities = distinct tokens in the map. tier2_occurrences = sum(len(engine.residual_tokens(it["scrubbed_text"])) for it in out_items) stats = { "tier1_dropped": tier1_total, "tier2_tokenized": tier2_occurrences, "distinct_entities": len(token_map), "descriptive_flags": descriptive_flags, } return { "task_id": body.task_id, "map_handle": handle, "items": out_items, "stats": stats, "expires_at": datetime.fromtimestamp(expires, tz=timezone.utc).isoformat(), } @router.post("/scrub") async def scrub_endpoint(body: ScrubBody): try: return await _do_scrub(body) except _Contract as e: return JSONResponse(status_code=e.status, content=e.body) async def _do_rehydrate(body: RehydrateBody): if not body.items: raise _Contract(400, {"error": "bad_request", "detail": "items is required"}) try: token_map = map_store.get(body.map_handle) except _Expired: raise _Contract(410, {"error": "map_expired"}) if token_map is None: # Unknown handle == nothing to restore (doc: 410 on lapsed OR unknown handle). raise _Contract(410, {"error": "map_expired"}) out_items = [] total_subbed = 0 all_unknown: set[str] = set() for item in body.items: present = engine.residual_tokens(item.text) unknown = [t for t in present if t not in token_map] if unknown and body.strict: # Tripwire: a token with no map entry == hallucinated/smuggled. raise _Contract(409, {"error": "unknown_tokens", "tokens": sorted(set(unknown))}) all_unknown.update(unknown) rehydrated = engine.rehydrate(item.text, token_map) total_subbed += sum(1 for t in present if t in token_map) out_items.append({"id": item.id, "rehydrated_text": rehydrated}) return { "items": out_items, "stats": {"tokens_substituted": total_subbed, "unknown_tokens": sorted(all_unknown)}, } @router.post("/rehydrate") async def rehydrate_endpoint(body: RehydrateBody): try: return await _do_rehydrate(body) except _Contract as e: return JSONResponse(status_code=e.status, content=e.body) return router