"""Human-decided entity-merge candidates (Phase 1). Called by server.py routes — NOT an agent tool. A partner approves (merge) or rejects (keep separate) each candidate the fuzzy tier surfaced. Approvals apply the merge and record it in entity_merges (durable, so deterministic resolution respects it); rejections are remembered so the pair is not re-surfaced. Everything is logged. """ import json import sqlite3 import uuid from datetime import datetime, timezone def _now(): return datetime.now(timezone.utc).replace(tzinfo=None).isoformat() + "Z" def _conn(db): c = sqlite3.connect(db) c.row_factory = sqlite3.Row c.execute("PRAGMA foreign_keys=ON") return c def _log(c, actor_id, action, target_id, payload): c.execute("""INSERT INTO interaction_log (id, ts, actor_type, actor_id, action, target_type, target_id, payload, source, created_at) VALUES (?,?,?,?,?,?,?,?,?,?)""", (str(uuid.uuid4()), _now(), "human", actor_id, action, "canonical_entity", target_id, json.dumps(payload) if payload is not None else None, "crm_ui", _now())) def list_candidates(db, status="pending"): c = _conn(db) # Only surface candidates whose BOTH sides are still live canonical entities. # When entity resolution self-heals (a grid "twin" matched back, so its # duplicate entity is soft-deleted), the candidate that paired them becomes # moot — hide it instead of asking a human to adjudicate a tombstone. rows = [dict(r) for r in c.execute( """SELECT mc.* FROM entity_merge_candidates mc JOIN canonical_entities a ON a.id = mc.entity_a AND a.deleted_at IS NULL JOIN canonical_entities b ON b.id = mc.entity_b AND b.deleted_at IS NULL WHERE mc.status=? ORDER BY mc.confidence DESC, mc.created_at DESC""", (status,))] c.close() return {"candidates": rows, "count": len(rows)} def _apply_merge(c, survivor, loser, confidence, reason): """Merge `loser` into `survivor`: record durably, repoint links + relationship edges, soft-delete the loser (never hard-delete — guardrail #3).""" c.execute("""INSERT INTO entity_merges (merged_id, survivor_id, confidence, reason, created_at) VALUES (?,?,?,?,?) ON CONFLICT(merged_id) DO UPDATE SET survivor_id=excluded.survivor_id, confidence=excluded.confidence, reason=excluded.reason""", (loser, survivor, confidence or 0.8, reason, _now())) c.execute("UPDATE entity_links SET canonical_id=?, match_kind='fuzzy_merge' WHERE canonical_id=?", (survivor, loser)) # repoint relationship edges (member_of, etc.); OR IGNORE avoids unique clashes, # then drop any leftover edges the survivor already had. c.execute("UPDATE OR IGNORE relationship_edges SET src_id=? WHERE src_id=?", (survivor, loser)) c.execute("UPDATE OR IGNORE relationship_edges SET dst_id=? WHERE dst_id=?", (survivor, loser)) c.execute("DELETE FROM relationship_edges WHERE src_id=? OR dst_id=?", (loser, loser)) c.execute("UPDATE canonical_entities SET deleted_at=?, updated_at=? WHERE id=?", (_now(), _now(), loser)) def decide(db, candidate_id, decision, decided_by): if decision not in ("approve", "reject"): return {"error": "bad_decision", "allowed": ["approve", "reject"]} c = _conn(db) cand = c.execute("SELECT * FROM entity_merge_candidates WHERE id=?", (candidate_id,)).fetchone() if not cand: c.close() return {"error": "not_found", "candidate_id": candidate_id} if cand["status"] != "pending": c.close() return {"error": "already_decided", "status": cand["status"]} if decision == "approve": _apply_merge(c, cand["entity_a"], cand["entity_b"], cand["confidence"], cand["reason"]) c.execute("UPDATE entity_merge_candidates SET status=?, decided_by=?, decided_at=? WHERE id=?", ("approved" if decision == "approve" else "rejected", decided_by, _now(), candidate_id)) _log(c, decided_by, f"entity.merge_{decision}d", cand["entity_a"], {"survivor": cand["entity_a"], "loser": cand["entity_b"], "names": [cand["name_a"], cand["name_b"]]}) c.commit() c.close() return {"id": candidate_id, "decision": decision, "merged": (cand["entity_b"] if decision == "approve" else None)}