"""Scrub/rehydrate CLIENT — one contract, two backends, switched by SCRUB_BACKEND. SCRUB_BACKEND=local (default) -> the in-repo deterministic scrubber (scrub.py); the known-entity dictionary is built from the CRM and the pseudonym map is held in this process. SCRUB_BACKEND=gateway -> Spark Control POST /scrub + /rehydrate (the eventual bypass-proof enforcement point; the map lives on the Spark). Same request/response shapes, so the Architect grounding code never changes when we flip the switch. Agents call THIS, never scrub.py directly, so enforcement can move to the gateway with no code change. The local map registry is in-process and short-lived (one grounding task). """ import os import sys import uuid sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) import scrub as R # noqa: E402 SCRUB_BACKEND = os.environ.get("SCRUB_BACKEND", "local").lower() # in-process token maps for the local backend, keyed by opaque handle (the map is the # de-anon key — kept local, never serialized to a prompt or to interaction_log). _MAPS = {} _KNOWN_CACHE = {} def _known_entities(db_path): """Build the CRM known-entity dictionary. FAIL CLOSED: never substitute an empty dictionary on error (that would silently run the scrubber name-blind) — propagate the exception so the caller refuses to emit. A legitimately-empty CRM is fine; a failed READ is not, and the two must not be conflated.""" if not db_path: raise ValueError("redaction: db_path is required for the local scrub backend (fail closed)") if db_path not in _KNOWN_CACHE: _KNOWN_CACHE[db_path] = R.build_known_entities(db_path) # raises on read failure return _KNOWN_CACHE[db_path] class Boundary: """The redaction boundary an agent routes Claude-bound LP context through. ner_fn (text -> [(surface, type)]) is the local-model NER backstop for UNKNOWN names the dictionary can't know — the single largest residual. In production the grounding flow passes the local-Qwen NER here; without it the dictionary+regex path is the floor, so callers must minimize-first and fail closed if the local model is down. """ def __init__(self, db_path=None, actor="architect", backend=None, ner_fn=None): self.db_path = db_path self.actor = actor self.backend = (backend or SCRUB_BACKEND) self.ner_fn = ner_fn # db_path required for BOTH backends: the CALLER supplies the known-entity dictionary # (Option 1) so Spark Control stays generic/portable and needs no CRM access; the # gateway only adds its local-Qwen NER backstop on top. if not db_path: raise ValueError("redaction: db_path is required (the caller supplies the entity dictionary; fail closed)") # ── scrub ── def scrub(self, texts, task_id=None, bucket=True, conn=None): """De-identify a list of texts under ONE shared token space. Returns {handle, items:[scrubbed,...], stats}. The real->token map is retained locally (local backend) or on the gateway (keyed by handle).""" task_id = task_id or f"task_{uuid.uuid4().hex[:12]}" if self.backend == "gateway": return self._scrub_gateway(texts, task_id, bucket) # local — known dict (fail-closed) + the NER backstop for unknown names state = R.ScrubState() known = _known_entities(self.db_path) items, last_audit = [], None for t in texts: out, _m, audit = R.scrub(t, known_entities=known, bucket=bucket, state=state, ner_fn=self.ner_fn) items.append(out) last_audit = audit handle = f"mh_{uuid.uuid4().hex[:16]}" _MAPS[handle] = dict(state.token_map) if conn is not None and last_audit is not None: try: R.log_scrub(conn, self.actor, last_audit, task=task_id, session_id=handle, source="mcp") conn.commit() except Exception: pass return {"handle": handle, "items": items, "stats": {"tokens": len(state.token_map), "tier1_dropped": len(state.tier1_dropped)}} def _scrub_gateway(self, texts, task_id, bucket): sys.path.insert(0, os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), "ingest")) import config, http_util # noqa: E402 # Option 1: WE build the dictionary from the CRM and supply it, so the gateway needs # no CRM access. It is sensitive (a slice of the LP list) but goes only to the # sovereign Spark and must be held transiently with the map, never logged/forwarded. body = {"task_id": task_id, "actor": self.actor, "items": [{"id": str(i), "text": t} for i, t in enumerate(texts)], "known_entities": _known_entities(self.db_path), "tier1_action": "drop", "bucket": {"amounts": bucket, "dates": bucket}, "ner": "auto"} status, data = http_util.request("POST", f"{config.SPARK_CONTROL_URL}/scrub", body, verify=config.SPARK_VERIFY_TLS) if status != 200: raise RuntimeError(f"/scrub -> {status}: {data}") return {"handle": data["map_handle"], "items": [it["scrubbed_text"] for it in data["items"]], "stats": data.get("stats", {})} # ── rehydrate ── def rehydrate(self, text, handle, strict=True, conn=None, human_decision="pending", reviewer_id=None): """Substitute real values back in. `strict` flags any placeholder with no map entry (a Claude-hallucinated/smuggled token) instead of silently passing it.""" if self.backend == "gateway": return self._rehydrate_gateway(text, handle, strict) token_map = _MAPS.get(handle, {}) out = R.rehydrate(text, token_map) residual = R.residual_tokens(out) if strict and residual: # FAIL CLOSED: a token with no map entry means Claude hallucinated/smuggled a # placeholder. Do NOT return the de-anonymized text alongside the error — hand # back the still-tokenized input so no real value is materialized. return {"text": text, "unknown_tokens": residual, "error": "unknown_tokens"} if conn is not None: try: R.log_rehydrate(conn, self.actor, tokens_rehydrated=len(token_map), residual=len(residual), human_decision=human_decision, reviewer_id=reviewer_id, session_id=handle, source="mcp") conn.commit() except Exception: pass return {"text": out, "unknown_tokens": residual} def _rehydrate_gateway(self, text, handle, strict): sys.path.insert(0, os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), "ingest")) import config, http_util # noqa: E402 body = {"task_id": handle, "map_handle": handle, "actor": self.actor, "items": [{"id": "0", "text": text}], "strict": strict} status, data = http_util.request("POST", f"{config.SPARK_CONTROL_URL}/rehydrate", body, verify=config.SPARK_VERIFY_TLS) if status != 200: return {"text": text, "unknown_tokens": [], "error": f"rehydrate {status}"} return {"text": data["items"][0]["rehydrated_text"], "unknown_tokens": data.get("stats", {}).get("unknown_tokens", [])} def forget(self, handle): """Drop the local map for a finished task (the de-anon key is short-lived).""" _MAPS.pop(handle, None)