Phase 1: dual approval default, web-UI index jobs + merge review queue, thesis v2

- Dual sign-off is now the default (thesis_required_approvals defaults to 2).
- Entity-merge review queue (migration 0003): the fuzzy/Qwen tier no longer
  auto-merges — it writes CANDIDATES (entity_merge_candidates) with a same/different
  suggestion + confidence + reason for a human to approve (merge) or reject (keep
  separate). entity_merge.py applies/rejects (durable via entity_merges, soft-delete,
  repoint links+edges); decided pairs aren't re-surfaced.
- entity_jobs.py: UI-triggered background index jobs (rebuild/update/find-duplicates)
  as subprocesses with a one-at-a-time lock; status in /api/system/status.
- server.py: /api/index/{rebuild,update}, /api/entities/find-duplicates,
  /api/entities/merge-candidates [+ /{id} decide] — admin-gated.
- docs/thesis-seed-v2.md: concrete, plain-English rewrite per Grant's feedback.

Backend verified end-to-end on synthetic data (candidate gen -> approve/reject).

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
Keysat
2026-06-05 11:14:12 -05:00
parent fa2a5ce95f
commit cd3cca725c
8 changed files with 336 additions and 65 deletions
+47 -63
View File
@@ -1,19 +1,15 @@
#!/usr/bin/env python3
"""Phase-0 Workstream B3 — fuzzy entity-resolution tier (local Qwen).
"""Phase-1 — fuzzy entity-resolution tier (local Qwen), REVIEW-QUEUE mode.
The deterministic tier (entity_resolution.py) merges only provable matches and
FLAGS the hard name-variant candidates (same firm + surname, different first
name/email) without guessing. This tier asks the local Qwen model (Spark Control
/v1/chat/completions — sovereign, on Ten31 infra) to adjudicate each candidate
and merges the confirmed ones.
A merge repoints the loser's entity_links to the survivor and soft-deletes the
loser canonical entity (deleted_at; never hard-deleted — guardrail #3). Every
merge is written to the interaction_log (guardrail #5). Idempotent: re-running
finds no new candidates once merged.
The deterministic tier (entity_resolution.py) flags hard name-variant candidates
(same firm + surname, different first name/email) without guessing. This tier asks
the local Qwen model (Spark Control — sovereign) for a SUGGESTION on each, and
writes a CANDIDATE row to entity_merge_candidates for a human to approve (merge)
or reject (keep separate) in the CRM web UI. It NO LONGER auto-merges — uncertainty
is surfaced, not applied (the human decides). Already-decided pairs and
already-merged entities are skipped, so re-running is safe and quiet.
python3 backend/ingest/fuzzy_resolve.py --db data/crm_dev.db
python3 backend/ingest/fuzzy_resolve.py --db data/crm_dev.db --dry-run
"""
import argparse
import json
@@ -32,7 +28,7 @@ _SYSTEM = ("You are an entity-resolution assistant for a CRM. Decide if the list
def _now():
return datetime.now(timezone.utc).isoformat()
return datetime.now(timezone.utc).replace(tzinfo=None).isoformat() + "Z"
def _ask(members, firm):
@@ -40,76 +36,64 @@ def _ask(members, firm):
prompt = (f"Firm: {firm or 'unknown'}\nPeople: {people}\n\n"
"Are these the SAME person under name variants? "
'Answer only JSON: {"same": true|false, "confidence": 0.0-1.0, "reason": "..."}')
return llm.chat_json(prompt, system=_SYSTEM, max_tokens=160) or {"same": False, "confidence": 0.0}
return llm.chat_json(prompt, system=_SYSTEM, max_tokens=160) or {"same": False, "confidence": 0.0, "reason": ""}
def _survivor(members):
# Prefer a member with an email, then the longest (most complete) name.
return sorted(members, key=lambda m: (bool(m[2]), len(m[1])), reverse=True)[0]
def run(db, threshold=0.7, dry_run=False):
counts, candidates = er.run(db) # ensure deterministic state + fresh candidates
def run(db, db_path=None):
db = db_path or db
counts, candidates = er.run(db) # deterministic state (respects prior merges) + fresh candidates
conn = sqlite3.connect(db)
conn.row_factory = sqlite3.Row
conn.execute("PRAGMA foreign_keys=ON")
name_of = {r["id"]: r["display_name"] for r in conn.execute("SELECT id, display_name FROM canonical_entities")}
merges = []
decided = {frozenset((r["entity_a"], r["entity_b"]))
for r in conn.execute("SELECT entity_a, entity_b FROM entity_merge_candidates")}
merged = {r[0] for r in conn.execute("SELECT merged_id FROM entity_merges")}
created = skipped = 0
for cand in candidates:
members = cand["members"]
verdict = _ask(members, name_of.get(cand["org"]))
same = bool(verdict.get("same")) and float(verdict.get("confidence", 0)) >= threshold
decision = {"surname": cand["surname"], "firm": name_of.get(cand["org"]),
"members": [{"id": m[0], "name": m[1]} for m in members],
"same": same, "confidence": verdict.get("confidence"),
"reason": verdict.get("reason")}
if same:
keep = _survivor(members)
losers = [m for m in members if m[0] != keep[0]]
decision["merged_into"] = {"id": keep[0], "name": keep[1]}
if not dry_run:
for loser in losers:
# Record the merge durably so deterministic re-runs respect it.
conn.execute("""INSERT INTO entity_merges (merged_id, survivor_id, confidence, reason, created_at)
VALUES (?,?,?,?,?)
ON CONFLICT(merged_id) DO UPDATE SET survivor_id=excluded.survivor_id,
confidence=excluded.confidence, reason=excluded.reason""",
(loser[0], keep[0], verdict.get("confidence", 0.7),
verdict.get("reason"), _now()))
conn.execute("UPDATE entity_links SET canonical_id=?, match_kind='fuzzy_merge', confidence=? "
"WHERE canonical_id=?", (keep[0], verdict.get("confidence", 0.7), loser[0]))
conn.execute("UPDATE canonical_entities SET deleted_at=?, updated_at=? WHERE id=?",
(_now(), _now(), loser[0]))
conn.execute("""INSERT INTO interaction_log
(id, ts, actor_type, actor_id, action, target_type, target_id, payload, source, created_at)
VALUES (?,?,?,?,?,?,?,?,?,?)""",
(str(uuid.uuid4()), _now(), "agent", "qwen_entity_resolver", "entity.merged",
"canonical_entity", keep[0], json.dumps(decision), "ingest", _now()))
merges.append(decision)
keep = _survivor(members)
losers = [m for m in members if m[0] != keep[0]]
verdict = _ask(members, name_of.get(cand["org"])) # one Qwen call per group
for loser in losers:
pair = frozenset((keep[0], loser[0]))
if pair in decided or loser[0] in merged or keep[0] in merged:
skipped += 1
continue
conn.execute("""
INSERT INTO entity_merge_candidates
(id, entity_a, entity_b, name_a, name_b, email_a, email_b, context, verdict, confidence, reason, status, created_at)
VALUES (?,?,?,?,?,?,?,?,?,?,?, 'pending', ?)
ON CONFLICT(entity_a, entity_b) DO NOTHING
""", (str(uuid.uuid4()), keep[0], loser[0], keep[1], loser[1], keep[2], loser[2],
f"{cand['surname']} @ {name_of.get(cand['org']) or 'unknown'}",
'same' if verdict.get('same') else 'different', verdict.get('confidence'),
verdict.get('reason'), _now()))
decided.add(pair)
created += 1
if not dry_run:
conn.commit()
live_people = conn.execute("SELECT COUNT(*) FROM canonical_entities WHERE entity_kind='person' AND deleted_at IS NULL").fetchone()[0]
conn.execute("""INSERT INTO interaction_log
(id, ts, actor_type, actor_id, action, target_type, payload, source, created_at)
VALUES (?,?,?,?,?,?,?,?,?)""",
(str(uuid.uuid4()), _now(), "agent", "qwen_entity_resolver", "entity.candidates_generated",
"canonical_entities", json.dumps({"created": created, "skipped": skipped}), "ingest", _now()))
conn.commit()
conn.close()
return merges, live_people
return {"candidates_created": created, "skipped_existing": skipped, "flagged_groups": len(candidates)}
def main():
ap = argparse.ArgumentParser()
ap.add_argument("--db", default="data/crm_dev.db")
ap.add_argument("--threshold", type=float, default=0.7)
ap.add_argument("--dry-run", action="store_true")
args = ap.parse_args()
merges, live = run(args.db, threshold=args.threshold, dry_run=args.dry_run)
print(f"Adjudicated {len(merges)} candidate group(s){' (dry run)' if args.dry_run else ''}:")
for m in merges:
names = " / ".join(p["name"] for p in m["members"])
verdict = f"MERGE -> {m['merged_into']['name']}" if m.get("merged_into") else "keep separate"
print(f" [{m['surname']}] {names}: same={m['same']} conf={m['confidence']} => {verdict}")
if m.get("reason"):
print(f" reason: {m['reason']}")
print(f"Live person entities now: {live}")
s = run(args.db)
print(f"Fuzzy review: {s['candidates_created']} new candidate(s) for review, "
f"{s['skipped_existing']} already decided ({s['flagged_groups']} flagged groups).")
if __name__ == "__main__":