Files
Keysat cd3cca725c Phase 1: dual approval default, web-UI index jobs + merge review queue, thesis v2
- Dual sign-off is now the default (thesis_required_approvals defaults to 2).
- Entity-merge review queue (migration 0003): the fuzzy/Qwen tier no longer
  auto-merges — it writes CANDIDATES (entity_merge_candidates) with a same/different
  suggestion + confidence + reason for a human to approve (merge) or reject (keep
  separate). entity_merge.py applies/rejects (durable via entity_merges, soft-delete,
  repoint links+edges); decided pairs aren't re-surfaced.
- entity_jobs.py: UI-triggered background index jobs (rebuild/update/find-duplicates)
  as subprocesses with a one-at-a-time lock; status in /api/system/status.
- server.py: /api/index/{rebuild,update}, /api/entities/find-duplicates,
  /api/entities/merge-candidates [+ /{id} decide] — admin-gated.
- docs/thesis-seed-v2.md: concrete, plain-English rewrite per Grant's feedback.

Backend verified end-to-end on synthetic data (candidate gen -> approve/reject).

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-05 11:14:12 -05:00

156 lines
6.6 KiB
Python

"""Human-gated thesis approval (Phase 1 Workstream E).
NOT an agent tool — called only by authenticated CRM routes in server.py. The
Architect can draft and submit; only a human partner can promote a version to
canonical. Supports Grant's dual-sign-off + collaborative feedback: both partners
can leave reviews (approve / request_changes / comment with free-text the
Architect reads to iterate); a version promotes to canonical once distinct
'approve' reviewers reach `thesis_required_approvals` (app_settings, default 1 —
set to 2 for dual sign-off). Promotion atomically supersedes the prior canonical,
honoring the one-canonical-per-line DB invariant. Everything is logged.
"""
import json
import sqlite3
import uuid
from datetime import datetime, timezone
_VALID = ("approve", "request_changes", "comment")
def _now():
return datetime.now(timezone.utc).replace(tzinfo=None).isoformat() + "Z"
def _conn(db):
c = sqlite3.connect(db)
c.row_factory = sqlite3.Row
c.execute("PRAGMA foreign_keys=ON")
return c
def _log(c, actor_id, action, target_id, payload):
c.execute("""INSERT INTO interaction_log
(id, ts, actor_type, actor_id, action, target_type, target_id, payload, source, created_at)
VALUES (?,?,?,?,?,?,?,?,?,?)""",
(str(uuid.uuid4()), _now(), "human", actor_id, action, "thesis", target_id,
json.dumps(payload) if payload is not None else None, "crm_ui", _now()))
def required_approvals(c):
# Default 2 = dual sign-off (both partners). Override via app_settings
# 'thesis_required_approvals' if you ever want single-approver.
r = c.execute("SELECT value_json FROM app_settings WHERE key='thesis_required_approvals'").fetchone()
try:
return max(1, int(json.loads(r[0]))) if r else 2
except Exception:
return 2
def _approver_count(c, version_id):
return len({r[0] for r in c.execute(
"SELECT DISTINCT reviewer_user_id FROM thesis_reviews WHERE version_id=? AND decision='approve'",
(version_id,))})
def _promote(c, version_row, approver_user_id):
"""Atomically supersede the prior canonical (if any) for this line, then make
this version canonical. Supersede-first keeps the one-canonical unique index satisfied."""
line_id = version_row["line_id"]
prior = c.execute("SELECT id FROM thesis_versions WHERE line_id=? AND status='canonical'", (line_id,)).fetchone()
if prior:
c.execute("UPDATE thesis_versions SET status='superseded', superseded_by=? WHERE id=?",
(version_row["id"], prior["id"]))
c.execute("UPDATE thesis_versions SET status='canonical', approved_at=? WHERE id=?", (_now(), version_row["id"]))
_log(c, approver_user_id, "thesis.approved", version_row["id"],
{"line_id": line_id, "superseded": prior["id"] if prior else None})
def record_review(db, version_id, reviewer_user_id, decision, feedback=None, target_node_id=None):
"""Record a partner's review. Promotes to canonical when approve-threshold met.
`reviewer_user_id` MUST be a real authenticated human (enforced by the route)."""
if decision not in _VALID:
return {"error": "bad_decision", "allowed": list(_VALID)}
c = _conn(db)
v = c.execute("SELECT * FROM thesis_versions WHERE id=?", (version_id,)).fetchone()
if not v:
c.close()
return {"error": "not_found", "version_id": version_id}
c.execute("""INSERT INTO thesis_reviews (id, version_id, reviewer_user_id, decision, feedback, target_node_id, created_at)
VALUES (?,?,?,?,?,?,?)""",
(str(uuid.uuid4()), version_id, reviewer_user_id, decision, feedback, target_node_id, _now()))
_log(c, reviewer_user_id, f"thesis.review.{decision}", version_id,
{"feedback": feedback, "target_node_id": target_node_id})
promoted = False
approvals = _approver_count(c, version_id)
need = required_approvals(c)
if decision == "approve" and v["status"] in ("draft", "in_review") and approvals >= need:
_promote(c, v, reviewer_user_id)
promoted = True
c.commit()
c.close()
return {"version_id": version_id, "decision": decision, "approvals": approvals,
"required": need, "promoted_to_canonical": promoted}
# ── reads for the review UI ───────────────────────────────────────────────────
def list_lines(db):
c = _conn(db)
rows = [dict(r) for r in c.execute(
"SELECT id, line_key, name, segment_key, is_core, status FROM thesis_lines WHERE deleted_at IS NULL ORDER BY is_core DESC, name")]
c.close()
return {"lines": rows}
def list_versions_for_review(db):
c = _conn(db)
rows = []
for v in c.execute("""SELECT v.id, v.line_id, l.line_key, l.name, v.version_no, v.status, v.created_at, v.rationale
FROM thesis_versions v JOIN thesis_lines l ON l.id=v.line_id
WHERE v.status='in_review' ORDER BY v.created_at DESC"""):
d = dict(v)
d["approvals"] = _approver_count(c, v["id"])
d["required"] = required_approvals(c)
rows.append(d)
c.close()
return {"versions": rows}
def get_canonical(db, line_key):
"""The one canonical version's frozen body_json; fails closed if none."""
c = _conn(db)
line = c.execute("SELECT id FROM thesis_lines WHERE line_key=? AND deleted_at IS NULL", (line_key,)).fetchone()
if not line:
c.close()
return {"status": "no_such_line", "line_key": line_key}
r = c.execute("SELECT * FROM thesis_versions WHERE line_id=? AND status='canonical'", (line["id"],)).fetchone()
c.close()
if not r:
return {"status": "no_canonical_thesis", "line_key": line_key}
return {"status": "ok", "line_key": line_key, "version_id": r["id"], "version_no": r["version_no"],
"approved_at": r["approved_at"], "thesis": json.loads(r["body_json"])}
def get_version(db, version_id):
c = _conn(db)
v = c.execute("SELECT * FROM thesis_versions WHERE id=?", (version_id,)).fetchone()
if not v:
c.close()
return {"error": "not_found", "version_id": version_id}
out = dict(v)
try:
out["body"] = json.loads(v["body_json"])
except Exception:
out["body"] = None
out.pop("body_json", None)
out["reviews"] = [dict(r) for r in c.execute(
"SELECT reviewer_user_id, decision, feedback, target_node_id, created_at FROM thesis_reviews WHERE version_id=? ORDER BY created_at",
(version_id,))]
out["approvals"] = _approver_count(c, version_id)
out["required"] = required_approvals(c)
c.close()
return out