cd3cca725c
- Dual sign-off is now the default (thesis_required_approvals defaults to 2).
- Entity-merge review queue (migration 0003): the fuzzy/Qwen tier no longer
auto-merges — it writes CANDIDATES (entity_merge_candidates) with a same/different
suggestion + confidence + reason for a human to approve (merge) or reject (keep
separate). entity_merge.py applies/rejects (durable via entity_merges, soft-delete,
repoint links+edges); decided pairs aren't re-surfaced.
- entity_jobs.py: UI-triggered background index jobs (rebuild/update/find-duplicates)
as subprocesses with a one-at-a-time lock; status in /api/system/status.
- server.py: /api/index/{rebuild,update}, /api/entities/find-duplicates,
/api/entities/merge-candidates [+ /{id} decide] — admin-gated.
- docs/thesis-seed-v2.md: concrete, plain-English rewrite per Grant's feedback.
Backend verified end-to-end on synthetic data (candidate gen -> approve/reject).
Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
80 lines
3.8 KiB
Python
80 lines
3.8 KiB
Python
"""Human-decided entity-merge candidates (Phase 1). Called by server.py routes —
|
|
NOT an agent tool. A partner approves (merge) or rejects (keep separate) each
|
|
candidate the fuzzy tier surfaced. Approvals apply the merge and record it in
|
|
entity_merges (durable, so deterministic resolution respects it); rejections are
|
|
remembered so the pair is not re-surfaced. Everything is logged.
|
|
"""
|
|
import json
|
|
import sqlite3
|
|
import uuid
|
|
from datetime import datetime, timezone
|
|
|
|
|
|
def _now():
|
|
return datetime.now(timezone.utc).replace(tzinfo=None).isoformat() + "Z"
|
|
|
|
|
|
def _conn(db):
|
|
c = sqlite3.connect(db)
|
|
c.row_factory = sqlite3.Row
|
|
c.execute("PRAGMA foreign_keys=ON")
|
|
return c
|
|
|
|
|
|
def _log(c, actor_id, action, target_id, payload):
|
|
c.execute("""INSERT INTO interaction_log
|
|
(id, ts, actor_type, actor_id, action, target_type, target_id, payload, source, created_at)
|
|
VALUES (?,?,?,?,?,?,?,?,?,?)""",
|
|
(str(uuid.uuid4()), _now(), "human", actor_id, action, "canonical_entity", target_id,
|
|
json.dumps(payload) if payload is not None else None, "crm_ui", _now()))
|
|
|
|
|
|
def list_candidates(db, status="pending"):
|
|
c = _conn(db)
|
|
rows = [dict(r) for r in c.execute(
|
|
"SELECT * FROM entity_merge_candidates WHERE status=? ORDER BY confidence DESC, created_at DESC", (status,))]
|
|
c.close()
|
|
return {"candidates": rows, "count": len(rows)}
|
|
|
|
|
|
def _apply_merge(c, survivor, loser, confidence, reason):
|
|
"""Merge `loser` into `survivor`: record durably, repoint links + relationship
|
|
edges, soft-delete the loser (never hard-delete — guardrail #3)."""
|
|
c.execute("""INSERT INTO entity_merges (merged_id, survivor_id, confidence, reason, created_at)
|
|
VALUES (?,?,?,?,?)
|
|
ON CONFLICT(merged_id) DO UPDATE SET survivor_id=excluded.survivor_id,
|
|
confidence=excluded.confidence, reason=excluded.reason""",
|
|
(loser, survivor, confidence or 0.8, reason, _now()))
|
|
c.execute("UPDATE entity_links SET canonical_id=?, match_kind='fuzzy_merge' WHERE canonical_id=?",
|
|
(survivor, loser))
|
|
# repoint relationship edges (member_of, etc.); OR IGNORE avoids unique clashes,
|
|
# then drop any leftover edges the survivor already had.
|
|
c.execute("UPDATE OR IGNORE relationship_edges SET src_id=? WHERE src_id=?", (survivor, loser))
|
|
c.execute("UPDATE OR IGNORE relationship_edges SET dst_id=? WHERE dst_id=?", (survivor, loser))
|
|
c.execute("DELETE FROM relationship_edges WHERE src_id=? OR dst_id=?", (loser, loser))
|
|
c.execute("UPDATE canonical_entities SET deleted_at=?, updated_at=? WHERE id=?", (_now(), _now(), loser))
|
|
|
|
|
|
def decide(db, candidate_id, decision, decided_by):
|
|
if decision not in ("approve", "reject"):
|
|
return {"error": "bad_decision", "allowed": ["approve", "reject"]}
|
|
c = _conn(db)
|
|
cand = c.execute("SELECT * FROM entity_merge_candidates WHERE id=?", (candidate_id,)).fetchone()
|
|
if not cand:
|
|
c.close()
|
|
return {"error": "not_found", "candidate_id": candidate_id}
|
|
if cand["status"] != "pending":
|
|
c.close()
|
|
return {"error": "already_decided", "status": cand["status"]}
|
|
|
|
if decision == "approve":
|
|
_apply_merge(c, cand["entity_a"], cand["entity_b"], cand["confidence"], cand["reason"])
|
|
c.execute("UPDATE entity_merge_candidates SET status=?, decided_by=?, decided_at=? WHERE id=?",
|
|
("approved" if decision == "approve" else "rejected", decided_by, _now(), candidate_id))
|
|
_log(c, decided_by, f"entity.merge_{decision}d", cand["entity_a"],
|
|
{"survivor": cand["entity_a"], "loser": cand["entity_b"], "names": [cand["name_a"], cand["name_b"]]})
|
|
c.commit()
|
|
c.close()
|
|
return {"id": candidate_id, "decision": decision,
|
|
"merged": (cand["entity_b"] if decision == "approve" else None)}
|