Architect grounding boundary: redaction/re-hydration privacy gate (v0.1.0:55)

Phase 1 Workstream D. Lets the Architect ground the thesis in REAL recurring LP
objections without any LP identity reaching the Claude API. Layered, defense-in-depth,
fail-closed by construction (docs/redaction-rehydration.md).

backend/redaction/:
- scrub.py: the leak-proof core. Drops Tier-1 (labelled/structured account/wire/SSN/
  IBAN/SWIFT/passport, separator-tolerant); tokenizes known LP entities (dictionary from
  the canonical layer, unicode-folded + hyphen-extended) and structured PII (emails,
  scheme-less/social URLs, intl+ext phones, currency-cued amounts, ISO/worded/numeric/
  quarter dates, addresses, bare long digit runs); pre-neutralizes injected [TYPE_N]
  strings; single-pass rehydrate; metadata-only audit logging (the pseudonym map is the
  de-anon key — local-only, never logged/sent). Hardened across THREE adversarial
  leak-hunts (worded/coded amounts, intl phones, NFD/ligature/zero-width names, slash/
  comma SSN, SWIFT, alpha-prefixed accounts, substance-preserving false-positive fixes).
- client.py: Boundary — one scrub/rehydrate contract, SCRUB_BACKEND=local (default) or
  gateway (Spark Control /scrub + /rehydrate). Fails closed (db_path required; dictionary
  build errors propagate; strict rehydrate returns tokenized-not-de-anon text).
- test_scrub_leak.py, test_reidentification.py: golden-file leak + re-identification
  suites (synthetic only, guardrail #9), regression-locking every leak-hunt vector.

backend/mcp/architect_grounding.py: the flow — retrieve (local) -> minimize-first
(local Qwen) -> scrub (+ local-Qwen NER backstop for unknown names) -> Claude over the
de-identified register only -> re-hydrate locally -> human review. FAILS CLOSED if the
local model is unreachable or a hallucinated token appears. test_grounding_boundary.py
proves nothing sensitive reaches Claude and the three fail-closed paths.

server.py: POST /api/architect/ground (admin) wires retrieval -> ground_objections.
docker_entrypoint.sh: SCRUB_BACKEND (default local). docs/spark-control-scrub-endpoints.md:
the gateway handover spec (Option 1 — caller supplies the entity dictionary).

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
Keysat
2026-06-05 17:06:29 -05:00
parent 300041a7ec
commit 2e70b34592
12 changed files with 1371 additions and 4 deletions
+139
View File
@@ -0,0 +1,139 @@
"""Scrub/rehydrate CLIENT — one contract, two backends, switched by SCRUB_BACKEND.
SCRUB_BACKEND=local (default) -> the in-repo deterministic scrubber (scrub.py);
the known-entity dictionary is built from the CRM
and the pseudonym map is held in this process.
SCRUB_BACKEND=gateway -> Spark Control POST /scrub + /rehydrate (the eventual
bypass-proof enforcement point; the map lives on the
Spark). Same request/response shapes, so the Architect
grounding code never changes when we flip the switch.
Agents call THIS, never scrub.py directly, so enforcement can move to the gateway with no
code change. The local map registry is in-process and short-lived (one grounding task).
"""
import os
import sys
import uuid
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
import scrub as R # noqa: E402
SCRUB_BACKEND = os.environ.get("SCRUB_BACKEND", "local").lower()
# in-process token maps for the local backend, keyed by opaque handle (the map is the
# de-anon key — kept local, never serialized to a prompt or to interaction_log).
_MAPS = {}
_KNOWN_CACHE = {}
def _known_entities(db_path):
"""Build the CRM known-entity dictionary. FAIL CLOSED: never substitute an empty
dictionary on error (that would silently run the scrubber name-blind) — propagate
the exception so the caller refuses to emit. A legitimately-empty CRM is fine; a
failed READ is not, and the two must not be conflated."""
if not db_path:
raise ValueError("redaction: db_path is required for the local scrub backend (fail closed)")
if db_path not in _KNOWN_CACHE:
_KNOWN_CACHE[db_path] = R.build_known_entities(db_path) # raises on read failure
return _KNOWN_CACHE[db_path]
class Boundary:
"""The redaction boundary an agent routes Claude-bound LP context through.
ner_fn (text -> [(surface, type)]) is the local-model NER backstop for UNKNOWN
names the dictionary can't know — the single largest residual. In production the
grounding flow passes the local-Qwen NER here; without it the dictionary+regex path
is the floor, so callers must minimize-first and fail closed if the local model is down.
"""
def __init__(self, db_path=None, actor="architect", backend=None, ner_fn=None):
self.db_path = db_path
self.actor = actor
self.backend = (backend or SCRUB_BACKEND)
self.ner_fn = ner_fn
# db_path required for BOTH backends: the CALLER supplies the known-entity dictionary
# (Option 1) so Spark Control stays generic/portable and needs no CRM access; the
# gateway only adds its local-Qwen NER backstop on top.
if not db_path:
raise ValueError("redaction: db_path is required (the caller supplies the entity dictionary; fail closed)")
# ── scrub ──
def scrub(self, texts, task_id=None, bucket=True, conn=None):
"""De-identify a list of texts under ONE shared token space. Returns
{handle, items:[scrubbed,...], stats}. The real->token map is retained
locally (local backend) or on the gateway (keyed by handle)."""
task_id = task_id or f"task_{uuid.uuid4().hex[:12]}"
if self.backend == "gateway":
return self._scrub_gateway(texts, task_id, bucket)
# local — known dict (fail-closed) + the NER backstop for unknown names
state = R.ScrubState()
known = _known_entities(self.db_path)
items, last_audit = [], None
for t in texts:
out, _m, audit = R.scrub(t, known_entities=known, bucket=bucket, state=state, ner_fn=self.ner_fn)
items.append(out)
last_audit = audit
handle = f"mh_{uuid.uuid4().hex[:16]}"
_MAPS[handle] = dict(state.token_map)
if conn is not None and last_audit is not None:
try:
R.log_scrub(conn, self.actor, last_audit, task=task_id, session_id=handle, source="mcp")
conn.commit()
except Exception:
pass
return {"handle": handle, "items": items,
"stats": {"tokens": len(state.token_map), "tier1_dropped": len(state.tier1_dropped)}}
def _scrub_gateway(self, texts, task_id, bucket):
sys.path.insert(0, os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), "ingest"))
import config, http_util # noqa: E402
# Option 1: WE build the dictionary from the CRM and supply it, so the gateway needs
# no CRM access. It is sensitive (a slice of the LP list) but goes only to the
# sovereign Spark and must be held transiently with the map, never logged/forwarded.
body = {"task_id": task_id, "actor": self.actor,
"items": [{"id": str(i), "text": t} for i, t in enumerate(texts)],
"known_entities": _known_entities(self.db_path),
"tier1_action": "drop", "bucket": {"amounts": bucket, "dates": bucket}, "ner": "auto"}
status, data = http_util.request("POST", f"{config.SPARK_CONTROL_URL}/scrub", body, verify=config.SPARK_VERIFY_TLS)
if status != 200:
raise RuntimeError(f"/scrub -> {status}: {data}")
return {"handle": data["map_handle"], "items": [it["scrubbed_text"] for it in data["items"]],
"stats": data.get("stats", {})}
# ── rehydrate ──
def rehydrate(self, text, handle, strict=True, conn=None, human_decision="pending", reviewer_id=None):
"""Substitute real values back in. `strict` flags any placeholder with no map
entry (a Claude-hallucinated/smuggled token) instead of silently passing it."""
if self.backend == "gateway":
return self._rehydrate_gateway(text, handle, strict)
token_map = _MAPS.get(handle, {})
out = R.rehydrate(text, token_map)
residual = R.residual_tokens(out)
if strict and residual:
# FAIL CLOSED: a token with no map entry means Claude hallucinated/smuggled a
# placeholder. Do NOT return the de-anonymized text alongside the error — hand
# back the still-tokenized input so no real value is materialized.
return {"text": text, "unknown_tokens": residual, "error": "unknown_tokens"}
if conn is not None:
try:
R.log_rehydrate(conn, self.actor, tokens_rehydrated=len(token_map), residual=len(residual),
human_decision=human_decision, reviewer_id=reviewer_id, session_id=handle, source="mcp")
conn.commit()
except Exception:
pass
return {"text": out, "unknown_tokens": residual}
def _rehydrate_gateway(self, text, handle, strict):
sys.path.insert(0, os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), "ingest"))
import config, http_util # noqa: E402
body = {"task_id": handle, "map_handle": handle, "actor": self.actor,
"items": [{"id": "0", "text": text}], "strict": strict}
status, data = http_util.request("POST", f"{config.SPARK_CONTROL_URL}/rehydrate", body, verify=config.SPARK_VERIFY_TLS)
if status != 200:
return {"text": text, "unknown_tokens": [], "error": f"rehydrate {status}"}
return {"text": data["items"][0]["rehydrated_text"], "unknown_tokens": data.get("stats", {}).get("unknown_tokens", [])}
def forget(self, handle):
"""Drop the local map for a finished task (the de-anon key is short-lived)."""
_MAPS.pop(handle, None)
+411
View File
@@ -0,0 +1,411 @@
"""Redaction / re-hydration boundary — the privacy gate between Ten31's sovereign
data and the Claude API. Implements docs/redaction-rehydration.md, hardened against an
adversarial leak-hunt (see docs/spark-control-scrub-endpoints.md for the gateway twin).
Defense in depth — NO single layer is trusted as "leak-proof":
1. MINIMIZE-FIRST (caller): a local-Qwen summary strips most identity before scrub runs.
2. PRE-NEUTRALIZE: any pre-existing [TYPE_N]-shaped string in the input is tokenized
first, so every placeholder that reaches Claude is one WE minted (no injection).
3. TIER-1 DROP: labelled/structured account-wire-SSN-IBAN-passport data, separator
tolerant, excised entirely (never tokenized, never in the map).
4. KNOWN-ENTITY tokenize: the LP identities we own (dictionary from the canonical
layer), matched UNICODE-FOLDED (accents/case) with hyphenated-surname extension.
5. STRUCTURED-PII tokenize/bucket: emails, URLs (incl. scheme-less/social), phones
(intl + extensions), amounts (currency words/codes/symbols + worded + ranges),
dates (ISO + worded + numeric + quarter), street addresses, bare long digit runs.
6. NER BACKSTOP (ner_fn, on-infra local Qwen): tokenizes residual unknown person/org/
location names the dictionary can't know. Unknown names are the largest residual,
so callers in production pass ner_fn and FAIL CLOSED if it is unreachable.
The pseudonym map ({token: real_value}) is the de-anonymization key: local-only, NEVER
sent to Claude, NEVER written to interaction_log (only counts).
"""
import json
import re
import sqlite3
import unicodedata
import uuid
from datetime import datetime, timezone
TOKEN_TYPES = ("PERSON", "ORG", "FUND", "EMAIL", "PHONE", "URL", "ADDR", "AMOUNT", "DATE", "LOC", "MISC")
_TOKEN_RE = re.compile(r"\[(?:" + "|".join(TOKEN_TYPES) + r")_\d+\]")
# ── Tier-1: NEVER-SEND (dropped, not tokenized). Separator-tolerant + label-anchored. ──
# Separators allow space/dot/dash/SLASH/COMMA so grouped account/SSN forms can't bypass.
_SEP = r"[\s.\-/,]"
_LABEL = (r"(?:acct|account|a/c|wire|routing|aba|sort\s?code|ssn|social\s?security|tax\s?id|"
r"ein|policy|member|ref)")
TIER1_PATTERNS = [
("ssn", re.compile(r"\b\d{3}" + _SEP + r"\d{2}" + _SEP + r"\d{4}\b")),
("ssn", re.compile(r"(?i)\b(?:ssn|social\s?security|tax\s?id|ein)\b[^\d]{0,12}\(?\d{3}\)?" + _SEP + r"{0,3}\d{2}" + _SEP + r"{0,3}\d{4}\b")),
("iban", re.compile(r"\b[A-Z]{2}\d{2}(?:\s?[A-Z0-9]){11,30}\b")), # IBAN >=15 chars; excludes 12-char ISIN
("swift", re.compile(r"(?i)\b(?:swift|bic)\b[^A-Za-z0-9]{0,8}[A-Z]{4}[A-Z]{2}[A-Z0-9]{2,5}\b")),
("passport", re.compile(r"(?i)\bpassport\b(?:\s?(?:no|number|num|#)\.?)?[^\dA-Za-z]{0,6}[A-Za-z]{0,2}[\s\-]?\d{6,9}\b")),
("labeled_account", re.compile(r"(?i)\b" + _LABEL + r"\b[^\dA-Za-z]{0,14}[#:]?\s*[\dXx](?:[\dXx]" + _SEP + r"?){5,}\b")),
# labelled identifier with a LETTER prefix or an intervening 'no/number/id/ref/to' word
# (e.g. 'acct A123456789012', 'member ID: X4451200931', 'Wire to GB123456789012') — these
# slip the digit-led rule above, the bare-digit catch, and the IBAN floor.
("labeled_account", re.compile(r"(?i)\b" + _LABEL + r"\b(?:[\s.:#\-]{0,3}(?:no|number|num|id|ref|to)\b)?[\s.:#\-]{0,4}[A-Za-z]{0,4}\d[\dA-Za-z]{4,}\b")),
]
# ── structured PII (Tier-2) ────────────────────────────────────────────────────
_EMAIL_RE = re.compile(r"\b[A-Za-z0-9._%+\-]+@[A-Za-z0-9.\-]+\.[A-Za-z]{2,}\b")
_URL_RE = re.compile(
r"\bhttps?://[^\s)\]]+"
r"|\bwww\.[^\s)\]]+"
r"|\b(?:[a-z0-9\-]+\.)?(?:linkedin|twitter|github|facebook|instagram|x|substack|medium)\.com/[^\s)\]]+",
re.IGNORECASE)
# Phones: NANP (3-3-4, optional +1, optional extension) OR E.164/international (leading +).
# Tightened so plain 4-4 year ranges ('2019-2024') don't match.
_PHONE_RE = re.compile(
r"(?<![\w.])(?:"
r"(?:\+?1[\s.\-]?)?(?:\(\d{3}\)[\s.\-]?|\d{3}[\s.\-])\d{3}[\s.\-]\d{4}"
r"|\+\d{1,3}(?:[\s.\-]?\d){7,14}"
r")(?:\s?(?:x|ext\.?|extension)\s?\d{1,6})?(?![\w])")
# Amounts: ONLY currency-anchored (symbol / code / currency-word), so non-money quantities
# ('3m tall', 'ten million tokens', '250k followers') are NOT eaten. Bare magnitudes without
# a currency cue are left to minimize-first + NER, which strip real money amounts.
_NUMWORD = (r"(?:one|two|three|four|five|six|seven|eight|nine|ten|eleven|twelve|thirteen|"
r"fourteen|fifteen|sixteen|seventeen|eighteen|nineteen|twenty|thirty|forty|fifty|"
r"sixty|seventy|eighty|ninety|hundred|couple|few|several|half|a)")
_MAG = r"(?:mm|bn|tn|thousand|million|billion|trillion|k|m|b)" # longest-first so 'MM' isn't split into 'M'
_AMOUNT_RES = [
re.compile(r"[$€£]\s?\d[\d,. ]*\d?\s?-\s?[$€£]?\s?\d[\d,. ]*\d?(?:\s?" + _MAG + r")?", re.IGNORECASE), # $3-5M range
re.compile(r"[$€£]\s?\d[\d,]*(?:\.\d+)?(?:\s?" + _MAG + r")?", re.IGNORECASE), # $5,000,000 / $5m
re.compile(r"\b(?:USD|EUR|GBP|CHF|CAD|AUD)\s?[$€£]?\s?\d[\d,]*(?:\.\d+)?(?:\s?" + _MAG + r")?", re.IGNORECASE),
re.compile(r"\b\d[\d,]*(?:\.\d+)?\s?(?:dollars?|euros?|pounds?)\b", re.IGNORECASE), # 5,000,000 dollars
re.compile(r"(?i)\b(?:" + _NUMWORD + r"[\s\-]+){1,4}" + _MAG + r"\s+(?:dollars?|euros?|pounds?)\b"), # five million dollars
]
_MONTHS = (r"(?:jan|feb|mar|apr|may|jun|jul|aug|sep|sept|oct|nov|dec)[a-z]*\.?")
_DATE_RES = [
re.compile(r"\b(?:19|20)\d{2}-\d{2}-\d{2}\b"), # ISO
re.compile(r"(?i)\b" + _MONTHS + r"\s+\d{1,2}(?:st|nd|rd|th)?,?\s+(?:19|20)?\d{2}\b"), # March 12, 1986
re.compile(r"(?i)\b\d{1,2}(?:st|nd|rd|th)?\s+" + _MONTHS + r",?\s+(?:19|20)?\d{2}\b"), # 12 March 1986
re.compile(r"\b(?:0?[1-9]|1[0-2])[/.\-](?:0?[1-9]|[12]\d|3[01])[/.\-](?:19|20)?\d{2}\b"), # 3/12/86 (valid m/d only)
re.compile(r"(?i)\bQ[1-4][\s\-]?(?:19|20)\d{2}\b"), # Q1 1986
re.compile(r"(?i)\b" + _MONTHS + r"\s+(?:19|20)\d{2}\b"), # March 1986
]
# Addresses: US number-first, PO Box, and European -strasse/-gasse + 'Rue/Calle/Via X N'.
# Comprehensive international address detection relies on the NER LOC backstop + minimize-first.
_ADDR_RE = re.compile(
r"\bP\.?\s?O\.?\s?Box\s+\d+"
r"|\b\d{1,6}\s+(?:[A-Z][A-Za-z'.]+\s?){1,4}"
r"(?:Street|St|Avenue|Ave|Road|Rd|Lane|Ln|Boulevard|Blvd|Drive|Dr|Court|Ct|Way|Place|Pl|Square|Sq|Terrace|Ter)\b\.?"
r"(?:,?\s+[A-Z][A-Za-z]+)*"
r"|\b[A-Z][A-Za-z]*(?:strasse|straße|gasse|weg)\s+\d{1,5}"
r"|\b(?:Rue|Calle|Via|Avenida)\s+(?:[A-Z][A-Za-z'.]+\s?){1,3}\d{1,5}",
re.IGNORECASE)
_ZIP_RE = re.compile(r"\b[A-Z]{2}\s+\d{5}(?:-\d{4})?\b")
# bare long unlabeled run -> reversible [MISC]. Not glued to letters (so an ISIN/ticker like
# US0378331005 stays intact substance), and a trailing sentence period doesn't block it.
_BARE_DIGITS_RE = re.compile(r"(?<![\dA-Za-z.\-])\d{9,}(?![A-Za-z]|\.?\d)")
_WORDX = r"[^\W_]" # unicode word char without underscore
def _fold(s):
"""1:1 length-preserving fold: strip diacritics per char + casefold, so 'Jonathán'
matches a stored ASCII 'Jonathan'. Length preserved so match spans map to the original."""
out = []
for ch in s:
d = unicodedata.normalize("NFKD", ch)
base = "".join(c for c in d if not unicodedata.combining(c))
out.append((base[0] if base else ch).lower())
return "".join(out)
def _bucket_amount(s):
num = re.sub(r"[^\d.]", "", s)
try:
v = float(num)
except ValueError:
return "~$?"
low = s.lower()
if "billion" in low or re.search(r"\d\s?bn?\b", low):
v *= 1_000_000_000
elif "million" in low or re.search(r"\d\s?mm?\b", low):
v *= 1_000_000
elif "thousand" in low or re.search(r"\d\s?k\b", low):
v *= 1_000
if v >= 1_000_000_000:
return f"~${round(v/1_000_000_000)}B"
if v >= 1_000_000:
return f"~${round(v/1_000_000)}M"
if v >= 1_000:
return f"~${round(v/1_000)}k"
return "~$<1k"
def _bucket_date(s):
iso = re.match(r"((?:19|20)\d{2})-(\d{2})-\d{2}", s)
if iso:
return f"Q{(int(iso.group(2))-1)//3 + 1} {iso.group(1)}"
q = re.search(r"(?i)Q([1-4])[\s\-]?((?:19|20)\d{2})", s)
if q:
return f"Q{q.group(1)} {q.group(2)}"
y = re.search(r"\b((?:19|20)\d{2})\b", s)
if y:
return y.group(1)
yy = re.search(r"[/.\-](\d{2})\b", s) # 2-digit year fallback
if yy:
return "19" + yy.group(1) if int(yy.group(1)) > 30 else "20" + yy.group(1)
return "(period)"
class ScrubState:
"""Local pseudonym map for ONE task: same surface string -> same token (injective).
The map is the de-anon key — local-only, never sent/serialized to a third party."""
def __init__(self):
self.token_map = {}
self._by_value = {}
self._counters = {t: 0 for t in TOKEN_TYPES}
self.tier1_dropped = []
def token_for(self, ttype, surface):
key = (ttype, surface)
tok = self._by_value.get(key)
if tok is None:
self._counters[ttype] += 1
tok = f"[{ttype}_{self._counters[ttype]}]"
self._by_value[key] = tok
self.token_map[tok] = surface
return tok
def _flatten_known(known_entities):
if not known_entities:
return []
type_by_key = {"persons": "PERSON", "orgs": "ORG", "funds": "FUND", "emails": "EMAIL", "locations": "LOC"}
out = []
for key, ttype in type_by_key.items():
for s in known_entities.get(key, []) or []:
s = (s or "").strip()
if s:
out.append((s, ttype))
return out
def _match_known(text, known_list, state):
"""Tokenize known entities, matched UNICODE-FOLDED + case-insensitive, longest-first,
extending over hyphen/apostrophe compounds so a known half of a double-barrelled
surname pulls in the whole token. Operates by span so we can fold for matching but
replace the ORIGINAL surface (preserved for rehydrate)."""
if not known_list:
return text
folded = _fold(text)
pairs = sorted(((_fold(unicodedata.normalize("NFKC", s)), t) for s, t in known_list),
key=lambda x: len(x[0]), reverse=True)
type_by_folded = {}
for fs, t in pairs:
type_by_folded.setdefault(fs, t)
alt = "|".join(re.escape(fs) for fs, _ in pairs if fs)
if not alt:
return text
rx = re.compile(r"(?<![0-9A-Za-z])(?:" + alt + r")(?![0-9A-Za-z])")
spans = []
for m in rx.finditer(folded):
st, en = m.start(), m.end()
ttype = type_by_folded.get(folded[st:en], "MISC")
# extend over hyphen/apostrophe compounds on both sides
while st > 1 and folded[st - 1] in "-'" and re.match(_WORDX, folded[st - 2] or ""):
k = st - 2
while k >= 0 and (re.match(_WORDX, folded[k]) or folded[k] in "-'"):
k -= 1
st = k + 1
while en < len(folded) - 1 and folded[en] in "-'" and re.match(_WORDX, folded[en + 1] or ""):
k = en + 1
while k < len(folded) and (re.match(_WORDX, folded[k]) or folded[k] in "-'"):
k += 1
en = k
spans.append((st, en, ttype))
if not spans:
return text
# merge overlaps, replace right-to-left in the ORIGINAL
spans.sort()
merged = [spans[0]]
for st, en, tt in spans[1:]:
ps, pe, ptt = merged[-1]
if st <= pe:
merged[-1] = (ps, max(pe, en), ptt)
else:
merged.append((st, en, tt))
for st, en, tt in reversed(merged):
surface = text[st:en]
text = text[:st] + state.token_for(tt, surface) + text[en:]
return text
def scrub(text, known_entities=None, bucket=False, state=None, ner_fn=None):
"""De-identify `text`. Returns (outbound_text, token_map, audit). Pass ner_fn (a
local-model NER callable text->[(surface,type)]) in production to catch unknown
names; without it the dictionary+regex path leaves unknown free-text names as
residual (callers should minimize-first and/or fail closed)."""
if text is None:
text = ""
st = state or ScrubState()
# NFKC-normalize so decomposed (NFD) names and ligatures align with the dictionary
# (else 'Reyés' in NFD or 'Steffen' with a ligature would miss and leak), and strip
# zero-width characters that could split a known name ('Rey<U+200B>es').
s = unicodedata.normalize("NFKC", str(text))
s = re.sub(r"[\u200b\u200c\u200d\u2060\ufeff]", "", s)
# 1) PRE-NEUTRALIZE pre-existing [TYPE_N] strings so they can't collide with our tokens.
s = _TOKEN_RE.sub(lambda m: st.token_for("MISC", m.group(0)), s)
# 2) TIER-1 DROP (labelled/structured; separator tolerant). Neutral marker, no value.
for label, pat in TIER1_PATTERNS:
def _drop(_m, _label=label):
st.tier1_dropped.append(_label)
return "[redacted]"
s = pat.sub(_drop, s)
# 3) KNOWN ENTITIES (unicode-folded, hyphen-extended).
s = _match_known(s, _flatten_known(known_entities), st)
# 4) STRUCTURED PII. Order matters: emails/urls/addresses, then DATES and AMOUNTS
# (so dashed ISO dates / ranges aren't swallowed by the permissive phone matcher),
# then PHONES, then any bare long digit run left over.
s = _EMAIL_RE.sub(lambda m: st.token_for("EMAIL", m.group(0)), s)
s = _URL_RE.sub(lambda m: st.token_for("URL", m.group(0)), s)
s = _ZIP_RE.sub(lambda m: st.token_for("LOC", m.group(0)), s) # state+ZIP before ADDR (which would eat the state)
s = _ADDR_RE.sub(lambda m: st.token_for("ADDR", m.group(0)), s)
for date_re in _DATE_RES:
if bucket:
s = date_re.sub(lambda m: _bucket_date(m.group(0)), s)
else:
s = date_re.sub(lambda m: st.token_for("DATE", m.group(0)), s)
for amt_re in _AMOUNT_RES:
if bucket:
s = amt_re.sub(lambda m: _bucket_amount(m.group(0)), s)
else:
s = amt_re.sub(lambda m: st.token_for("AMOUNT", m.group(0)), s)
s = _PHONE_RE.sub(lambda m: st.token_for("PHONE", m.group(0)), s)
# bare long unlabeled digit runs -> reversible [MISC] (never leak digits to Claude;
# don't DROP, since these may be substance like share counts / security ids).
s = _BARE_DIGITS_RE.sub(lambda m: st.token_for("MISC", m.group(0)), s)
# 5) NER BACKSTOP for unknown names (production: local Qwen). Tokenize what it finds.
# A connection failure here propagates so the caller can FAIL CLOSED rather than
# emit name-blind. Sort longest-first so a full name is tokenized before its parts.
if ner_fn is not None:
for surface, ntype in sorted((ner_fn(s) or []), key=lambda e: len(e[0] or ""), reverse=True):
surface = (surface or "").strip()
if not surface or _TOKEN_RE.search(surface):
continue
tt = ntype if ntype in TOKEN_TYPES else "PERSON"
s = re.sub(r"(?<![0-9A-Za-z])" + re.escape(surface) + r"(?![0-9A-Za-z])",
lambda m: st.token_for(tt, m.group(0)), s)
audit = {
"token_count": len(st.token_map),
"tokens_by_type": _counts_by_type(st.token_map),
"tier1_dropped_count": len(st.tier1_dropped),
"tier1_dropped_kinds": sorted(set(st.tier1_dropped)),
"bucketed": bool(bucket),
"outbound_chars": len(s),
}
return s, dict(st.token_map), audit
def _counts_by_type(token_map):
out = {}
for tok in token_map:
m = re.match(r"\[([A-Z]+)_\d+\]", tok)
if m:
out[m.group(1)] = out.get(m.group(1), 0) + 1
return out
def rehydrate(text, token_map):
"""Substitute real values back in via a SINGLE non-overlapping pass (one alternation,
longest tokens first) so an inserted value that is itself token-shaped can't be
re-substituted by a later pass. Tier-1 drops are not restorable — excluded by design."""
s = str(text or "")
if not token_map:
return s
rx = re.compile("|".join(re.escape(t) for t in sorted(token_map, key=len, reverse=True)))
return rx.sub(lambda m: token_map[m.group(0)], s)
def residual_tokens(text):
return _TOKEN_RE.findall(str(text or ""))
# ── known-entity dictionary from the CRM (read-only) ───────────────────────────
def build_known_entities(db_path):
"""Deterministic dictionary of OUR entities to tokenize, read-only from the CRM.
Includes full names AND every name part (so mid-prose surnames are caught) + email
local-parts. RAISES on read failure — callers must fail closed, never run name-blind."""
persons, orgs, funds, emails = set(), set(), set(), set()
conn = sqlite3.connect(f"file:{db_path}?mode=ro", uri=True)
conn.row_factory = sqlite3.Row
def _add_person(name):
name = (name or "").strip()
if len(name) >= 2:
persons.add(name)
for part in re.split(r"[\s'\-]+", name):
if len(part) >= 2 and not part.isdigit(): # index every part incl. short surnames (Wu, Li)
persons.add(part)
def _safe(q, fn):
try:
for r in conn.execute(q):
fn(r)
except sqlite3.OperationalError:
pass
# No `deleted_at` filter: tokenizing a soft-deleted name is desirable, and the live
# contacts/canonical schemas vary on that column — filtering on it silently zeroed the
# whole dictionary (a missing-column OperationalError swallowed by _safe).
_safe("SELECT display_name, primary_email FROM canonical_entities WHERE entity_kind='person'",
lambda r: (_add_person(r["display_name"]), r["primary_email"] and emails.add(r["primary_email"].strip().lower())))
_safe("SELECT first_name, last_name, email FROM contacts",
lambda r: (_add_person(f"{r['first_name'] or ''} {r['last_name'] or ''}"),
r["email"] and emails.add(r["email"].strip().lower())))
_safe("SELECT full_name, email FROM fundraising_contacts",
lambda r: (_add_person(r["full_name"]), r["email"] and emails.add(r["email"].strip().lower())))
_safe("SELECT display_name FROM canonical_entities WHERE entity_kind IN ('organization','investor','lp')",
lambda r: r["display_name"] and orgs.add(r["display_name"].strip()))
_safe("SELECT name FROM organizations", lambda r: r["name"] and orgs.add(r["name"].strip()))
_safe("SELECT investor_name FROM fundraising_investors", lambda r: r["investor_name"] and orgs.add(r["investor_name"].strip()))
_safe("SELECT fund_name FROM fundraising_funds", lambda r: r["fund_name"] and funds.add(r["fund_name"].strip()))
conn.close()
for e in list(emails):
lp = e.split("@")[0]
if len(lp) >= 3 and not lp.isdigit():
persons.add(lp)
return {"persons": sorted(persons, key=len, reverse=True),
"orgs": sorted(orgs, key=len, reverse=True),
"funds": sorted(funds, key=len, reverse=True),
"emails": sorted(emails, key=len, reverse=True)}
# ── audit logging (metadata only — never the map or real values) ───────────────
def _now():
return datetime.now(timezone.utc).replace(tzinfo=None).isoformat() + "Z"
def log_scrub(conn, actor_id, audit, task=None, session_id=None, target_id=None, source="mcp"):
payload = {"task": task, "session_id": session_id,
"token_count": audit.get("token_count"), "tokens_by_type": audit.get("tokens_by_type"),
"tier1_dropped_count": audit.get("tier1_dropped_count"),
"tier1_dropped_kinds": audit.get("tier1_dropped_kinds"),
"bucketed": audit.get("bucketed"), "outbound_chars": audit.get("outbound_chars")}
conn.execute(
"""INSERT INTO interaction_log (id, ts, actor_type, actor_id, action, target_type, target_id, payload, source, created_at)
VALUES (?,?, 'agent', ?, 'redaction.scrub', 'canonical_entity', ?, ?, ?, ?)""",
(str(uuid.uuid4()), _now(), actor_id, target_id, json.dumps(payload), source, _now()))
def log_rehydrate(conn, actor_id, tokens_rehydrated, residual, human_decision="pending",
reviewer_id=None, task=None, session_id=None, source="mcp"):
payload = {"task": task, "session_id": session_id, "tokens_rehydrated": tokens_rehydrated,
"residual_placeholders": residual, "human_decision": human_decision, "reviewer_id": reviewer_id}
conn.execute(
"""INSERT INTO interaction_log (id, ts, actor_type, actor_id, action, target_type, target_id, payload, source, created_at)
VALUES (?,?, 'agent', ?, 'redaction.rehydrate', 'canonical_entity', NULL, ?, ?, ?)""",
(str(uuid.uuid4()), _now(), actor_id, json.dumps(payload), source, _now()))
+133
View File
@@ -0,0 +1,133 @@
#!/usr/bin/env python3
"""Re-identification spot-check (redaction-rehydration.md §6) — OFFLINE + SYNTHETIC.
A deterministic approximation of "feed only the scrubbed prompt to a model and try to
recover who it is." Three probes:
A. Exact/normalized leak gate (MUST PASS): re-scan the scrubbed payload for ANY known
real value or Tier-1 string under normalization (case, punctuation, reversed
'Last First', email local-part). Any hit = tokenizer miss = FAIL.
B. Descriptive-identifier residual (BOUNDED): phrases that re-identify even with names
tokenized ("the family that sold the mining company in Texas"). The deterministic
scrubber is not expected to catch these (the on-infra local-Qwen pass + the
minimize-first summary do); this probe MEASURES the residual and fails only if it
EXCEEDS a committed ceiling, so leakage can't silently grow.
C. Inference via bucketing: with bucket=True, no exact amount/identity-date survives,
and a (amount-band, year, sector) tuple is not unique to one synthetic entity.
Run: cd backend && python3 redaction/test_reidentification.py
"""
import os
import re
import sys
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
import scrub as R # noqa: E402
FAILS = []
def check(cond, msg):
print((" PASS " if cond else " FAIL ") + msg)
if not cond:
FAILS.append(msg)
def _norm(s):
return re.sub(r"[^a-z0-9]+", " ", s.lower()).strip()
# Probe A — known synthetic entities that must NOT be recoverable from the scrubbed text
KB = {
"persons": ["Jonathan Reyes", "Marta Quine"],
"orgs": ["Cedar Point Capital"],
"emails": ["jon@cedarpoint.example"],
"tier1": ["000123456789", "123-45-6789"],
}
RAW_A = ("Jonathan Reyes at Cedar Point Capital (jon@cedarpoint.example) is cooling; Reyes wants better "
"terms. Marta Quine disagrees. acct 000123456789. Substance: fee and lock-up objections.")
KNOWN_A = {"persons": ["Jonathan Reyes", "Reyes", "Marta Quine", "Marta"],
"orgs": ["Cedar Point Capital"], "funds": [], "emails": ["jon@cedarpoint.example"]}
def probe_a():
print("\n[probe A — exact/normalized leak gate]")
outbound, _, _ = R.scrub(RAW_A, known_entities=KNOWN_A, bucket=False)
nout = _norm(outbound)
hits = []
for cat in ("persons", "orgs", "emails", "tier1"):
for v in KB[cat]:
variants = {_norm(v)}
if " " in v and cat == "persons":
a, b = v.split()[0], v.split()[-1]
variants.add(_norm(f"{b} {a}")) # reversed Last First
variants.add(_norm(b)) # bare surname
if "@" in v:
variants.add(_norm(v.split("@")[0])) # email local-part
for var in variants:
if var and var in nout:
hits.append((v, var))
check(not hits, f"no known identifier recoverable from scrubbed text (hits={hits})")
# Probe B — descriptive re-identifiers (bounded residual)
DESCRIPTIVE = [
"the family that sold the mining company in Texas",
"the former CTO of a well-known payments unicorn",
"the senator's brother who runs a family office",
]
RAW_B = "Notes: our contact is " + DESCRIPTIVE[0] + ". Another is " + DESCRIPTIVE[1] + ". A third is " + DESCRIPTIVE[2] + "."
RESIDUAL_CEILING = 3 # known residual the deterministic scrubber alone does not catch;
# the on-infra Qwen pass + minimize-first summary drive this toward 0.
def probe_b():
print("\n[probe B — descriptive-identifier residual, bounded]")
outbound, _, _ = R.scrub(RAW_B, known_entities={"persons": [], "orgs": [], "funds": [], "emails": []})
surviving = [d for d in DESCRIPTIVE if d in outbound]
for d in surviving:
print(f" flagged residual (handled on-infra by Qwen/minimize-first): {d!r}")
check(len(surviving) <= RESIDUAL_CEILING,
f"descriptive residual within committed ceiling ({len(surviving)} <= {RESIDUAL_CEILING})")
# Probe C — bucketing destroys exact values + singling-out
ENTITIES_C = [
{"name": "A", "amount": "$5,200,000", "date": "1986-02-10", "sector": "energy"},
{"name": "B", "amount": "$4,800,000", "date": "1986-03-20", "sector": "energy"}, # same band/year/sector as A
{"name": "C", "amount": "$25,000,000", "date": "1991-09-01", "sector": "bitcoin"},
]
def probe_c():
print("\n[probe C — inference via bucketing]")
raw = " ".join(f"Investor commits {e['amount']} on {e['date']} in {e['sector']}." for e in ENTITIES_C)
outbound, _, _ = R.scrub(raw, known_entities={"persons": [], "orgs": [], "funds": [], "emails": []}, bucket=True)
check(re.search(r"\$\s?\d[\d,]{2,}", outbound) is None, "no exact $ amount survives bucketing")
check(re.search(r"\b(?:19|20)\d{2}-\d{2}-\d{2}\b", outbound) is None, "no exact date survives bucketing")
# singling-out: the (amount-band, year, sector) tuple must not be unique to one entity
tuples = {}
for e in ENTITIES_C:
band = R._bucket_amount(e["amount"])
year = e["date"][:4]
tuples.setdefault((band, year, e["sector"]), []).append(e["name"])
unique = [k for k, v in tuples.items() if len(v) == 1]
# A and B collapse to the same bucket-tuple; C is alone but that's an accepted single in this fixture
check(any(len(v) > 1 for v in tuples.values()),
f"bucketing collapses distinct entities into shared bands (tuples={ {k: v for k,v in tuples.items()} })")
def main():
probe_a()
probe_b()
probe_c()
print()
if FAILS:
print(f"FAILED ({len(FAILS)}):")
for f in FAILS:
print(f" - {f}")
sys.exit(1)
print("ALL PASS (re-identification spot-check)")
if __name__ == "__main__":
main()
+171
View File
@@ -0,0 +1,171 @@
#!/usr/bin/env python3
"""Golden-file LEAK TEST for the redaction boundary, hardened across two adversarial
leak-hunts. Synthetic fixtures only (guardrail #9).
Per case: must_vanish (never reach Claude), tier1_excluded (also not in the map),
substance (survives verbatim), perfect inverse, leak-proof audit. Plus a round-2
"hardening vectors" section that regression-locks: NFD/ligature unicode names,
slash/comma SSN + SWIFT + passport Tier-1 drops, sentence-final bare digits, the
rehydrate collision fix, and the FALSE-POSITIVE survival of non-money quantities /
version numbers / ISINs (we de-identify, we don't destroy substance).
Deterministic + offline (the dictionary is each case's own lists; the unknown-name
NER backstop is exercised in test_grounding_boundary.py). Currency-CUED amounts are
caught here; bare magnitudes ('5MM') are left to minimize-first + NER by design.
Run: cd backend && python3 redaction/test_scrub_leak.py
"""
import json
import os
import re
import sqlite3
import sys
import unicodedata
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
import scrub as R # noqa: E402
CASES = [
{
"name": "labeled-tier1 + core tier2",
"raw": ("Jonathan Reyes (jon@cedarpoint.example) at Cedar Point Capital is cooling on Fund III. "
"Reyes would commit $5,000,000. Wire to acct 000123456789 spooked compliance. Met 1986-03-12. "
"Substance: the objection is fee load and lock-up; sentiment negative on the energy thesis."),
"known": {"persons": ["Jonathan Reyes", "Reyes"], "orgs": ["Cedar Point Capital"],
"funds": ["Fund III"], "emails": ["jon@cedarpoint.example"]},
"must_vanish": ["Jonathan Reyes", "Reyes", "jon@cedarpoint.example", "Cedar Point Capital",
"Fund III", "$5,000,000", "1986-03-12", "000123456789"],
"tier1_excluded": ["000123456789"],
"substance": ["the objection is fee load and lock-up", "sentiment negative on the energy thesis"],
},
{
"name": "worded/coded amounts, intl phone, urls, non-iso dates",
"raw": ("He would commit five million dollars; a $5MM ticket, USD 5,000,000, and a $3-5M range. "
"Reach +44 20 7946 0958 or www.cedarpoint.example; profile linkedin.com/in/jreyes. "
"Met March 12, 1986 and again 3/12/86. Concern: liquidity timeline only."),
"known": {"persons": [], "orgs": [], "funds": [], "emails": []},
"must_vanish": ["five million dollars", "$5MM", "USD 5,000,000", "$3-5M", "+44 20 7946 0958",
"www.cedarpoint.example", "linkedin.com/in/jreyes", "March 12, 1986", "3/12/86"],
"tier1_excluded": [],
"substance": ["Concern: liquidity timeline only"],
},
{
"name": "diacritics + hyphenated + short surnames",
"raw": ("Spoke to Jonathán Reyés about the thesis. Reyes-Castellanos co-invests. "
"Wu is warm; Li wants a side letter on fees."),
"known": {"persons": ["Jonathan Reyes", "Reyes", "Li Wu", "Li", "Wu"], "orgs": [], "funds": [], "emails": []},
"must_vanish": ["Jonathán", "Reyés", "Castellanos", "Wu", "Li"],
"tier1_excluded": [],
"substance": ["wants a side letter on fees"],
},
{
"name": "tier1 separators (slash/comma/space) + swift + address + ext",
"raw": ("Wire to acct # 1234-5678-9012 spooked compliance. SSN 123/45/6789 and 123 45 6789 on file. "
"Via SWIFT CHASUS33XXX. Lives at 42 Maple Avenue, Greenwich, CT 06830. Office 212-555-0188 x4021. "
"Substance: wants a co-investment right."),
"known": {"persons": [], "orgs": [], "funds": [], "emails": []},
"must_vanish": ["1234-5678-9012", "123/45/6789", "123 45 6789", "CHASUS33XXX", "42 Maple Avenue",
"212-555-0188", "x4021", "06830"],
"tier1_excluded": ["1234-5678-9012", "123/45/6789", "123 45 6789", "CHASUS33XXX"],
"substance": ["wants a co-investment right"],
},
]
FAILS = []
def check(cond, msg):
print((" PASS " if cond else " FAIL ") + msg)
if not cond:
FAILS.append(msg)
def tier1_redacted(raw):
s = unicodedata.normalize("NFKC", raw)
for _, pat in R.TIER1_PATTERNS:
s = pat.sub("[redacted]", s)
return s
def main():
db = os.path.join(__import__("tempfile").mkdtemp(), "log.db")
conn = sqlite3.connect(db)
conn.execute("""CREATE TABLE interaction_log (id TEXT PRIMARY KEY, ts TEXT, actor_type TEXT, actor_id TEXT,
action TEXT, target_type TEXT, target_id TEXT, payload TEXT, source TEXT, created_at TEXT)""")
for case in CASES:
raw, known = case["raw"], case["known"]
print(f"\n[{case['name']}]")
check(not R.residual_tokens(raw), "raw fixture has no [TYPE_N]-shaped strings")
outbound, tmap, audit = R.scrub(raw, known_entities=known, bucket=False)
for v in case["must_vanish"]:
check(v not in outbound, f"identifier {v!r} absent from outbound")
for v in case["tier1_excluded"]:
check(all(v not in mv for mv in tmap.values()), f"Tier-1 {v!r} excluded, not tokenized")
for s in case["substance"]:
check(s in outbound, f"substance survives: {s!r}")
check(len(set(tmap.values())) == len(tmap), "map injective")
check(R.rehydrate(outbound, tmap) == tier1_redacted(raw), "rehydrate == raw w/ Tier-1 redacted (perfect inverse)")
check(not R.residual_tokens(R.rehydrate(outbound, tmap)), "no placeholder survives rehydrate")
R.log_scrub(conn, "architect", audit, task="g", session_id="t", source="mcp")
conn.commit()
blob = " ".join(r[0] for r in conn.execute("SELECT payload FROM interaction_log"))
check(all(v not in blob for v in case["must_vanish"]), "audit log carries NO sensitive value")
# ── round-2 hardening vectors ──
def out(raw, known=None):
o, _m, _a = R.scrub(raw, known_entities=known or {}, bucket=False)
return o
print("\n[unicode — NFD / ligature names]")
nfd = unicodedata.normalize("NFD", "Jonathan Reyés is cooling.")
check("Reyés" not in unicodedata.normalize("NFKC", out(nfd, {"persons": ["Jonathan Reyes", "Reyes"]})),
"NFD-decomposed accented name does not leak")
check("Steffen" not in out("LP Steffen is cooling.", {"persons": ["Steffen"]}),
"ligature name (Steffen) does not leak")
print("\n[tier1 — slash/comma/swift/passport]")
o, m, _ = R.scrub("Reyes SSN 123/45/6789 and 123,45,6789 on the W9.", known_entities={}, bucket=False)
check("123/45/6789" not in o and "123,45,6789" not in o, "slash/comma SSN dropped")
check(all("123/45/6789" not in v and "123,45,6789" not in v for v in m.values()), "SSN not in map (excluded)")
check("CHASUS33XXX" not in out("Wire via SWIFT CHASUS33XXX today."), "SWIFT/BIC dropped")
check("a1234567" not in out("Passport number a1234567 expires 2030."), "passport-with-'number' dropped")
print("\n[bare digits at sentence end]")
check("123456789012" not in out("The security ID is 123456789012."), "9+ digit run at sentence end tokenized")
print("\n[FALSE-POSITIVE survival — substance preserved]")
check("3m tall" in out("The wall is 3m tall."), "'3m tall' (meters) NOT eaten as money")
check("250k followers" in out("She has 250k followers on X."), "'250k followers' NOT eaten as money")
check("3.14.159" in out("Pi is roughly 3.14.159 here."), "version-ish number NOT eaten as a date")
check("US0378331005" in out("We hold ISIN US0378331005 in the sleeve."), "ISIN preserved (substance, not dropped)")
check("2019-2024" in out("Track record spans 2019-2024."), "year range NOT mislabeled as a phone")
print("\n[integrity — rehydrate single-pass, no cascade]")
raw = "Refer to [MISC_2] then [PERSON_9]."
oo, mm, _ = R.scrub(raw, known_entities={}, bucket=False)
check(R.rehydrate(oo, mm) == raw, "same-length placeholder literals round-trip without cascade")
print("\n[round-4 — alpha-prefixed accounts, MM, zero-width]")
o, m, _ = R.scrub("Acct A123456789012 flagged. Member ID: X4451200931 noted. Wire to GB123456789012 today.",
known_entities={}, bucket=False)
for v in ["A123456789012", "X4451200931", "GB123456789012"]:
check(v not in o, f"alpha-prefixed labelled identifier {v!r} dropped")
check(all(v not in mv for mv in m.values()), f"{v!r} excluded, not tokenized")
o2 = out("Commit of $5MM and €10MM confirmed.")
check("$5MM" not in o2 and "5M " not in o2 and "MM" not in o2, "double-magnitude $5MM fully tokenized (no stray 'M')")
zw = "LP Reyes is cooling." # zero-width space splitting the surname
check("Reyes" not in out(zw, {"persons": ["Reyes"]}) and "Reyes" not in out(zw, {"persons": ["Reyes"]}),
"zero-width-split known name does not leak")
conn.close()
print()
if FAILS:
print(f"FAILED ({len(FAILS)}):")
for f in FAILS:
print(f" - {f}")
sys.exit(1)
print("ALL PASS (redaction leak test — hardened x2)")
if __name__ == "__main__":
main()