Files
ten31-database/backend/entity_jobs.py
T
Keysat cd3cca725c Phase 1: dual approval default, web-UI index jobs + merge review queue, thesis v2
- Dual sign-off is now the default (thesis_required_approvals defaults to 2).
- Entity-merge review queue (migration 0003): the fuzzy/Qwen tier no longer
  auto-merges — it writes CANDIDATES (entity_merge_candidates) with a same/different
  suggestion + confidence + reason for a human to approve (merge) or reject (keep
  separate). entity_merge.py applies/rejects (durable via entity_merges, soft-delete,
  repoint links+edges); decided pairs aren't re-surfaced.
- entity_jobs.py: UI-triggered background index jobs (rebuild/update/find-duplicates)
  as subprocesses with a one-at-a-time lock; status in /api/system/status.
- server.py: /api/index/{rebuild,update}, /api/entities/find-duplicates,
  /api/entities/merge-candidates [+ /{id} decide] — admin-gated.
- docs/thesis-seed-v2.md: concrete, plain-English rewrite per Grant's feedback.

Backend verified end-to-end on synthetic data (candidate gen -> approve/reject).

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-05 11:14:12 -05:00

66 lines
2.3 KiB
Python

"""Background index / entity-resolution jobs, triggered from the CRM web UI.
Replaces the StartOS actions with one-click web-UI buttons. Runs the ingest
scripts as SUBPROCESSES (cwd backend/ingest) so the heavy deps (fastembed, the
Spark clients) are not imported into the CRM server process, and so each run is
isolated. One job at a time (a process-level lock); progress/result is exposed
via get_status() and surfaced in /api/system/status.
Jobs:
rebuild_index — full re-chunk + re-embed into Qdrant (sync.py --recreate)
update_index — incremental sync (sync.py)
find_duplicates — deterministic resolution + Qwen suggestions -> review queue
"""
import os
import subprocess
import threading
from datetime import datetime, timezone
INGEST_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), "ingest")
_JOBS = {
"rebuild_index": ["python3", "sync.py", "--recreate"],
"update_index": ["python3", "sync.py"],
"find_duplicates": ["python3", "fuzzy_resolve.py"],
}
_lock = threading.Lock()
_state = {"running": False, "kind": None, "started_at": None,
"finished_at": None, "result": None, "tail": None}
def _now():
return datetime.now(timezone.utc).replace(tzinfo=None).isoformat() + "Z"
def get_status():
with _lock:
return dict(_state)
def start(kind, db_path):
if kind not in _JOBS:
return {"error": "unknown_job", "allowed": list(_JOBS)}
with _lock:
if _state["running"]:
return {"error": "job_running", "kind": _state["kind"]}
_state.update(running=True, kind=kind, started_at=_now(),
finished_at=None, result=None, tail=None)
threading.Thread(target=_run, args=(kind, db_path), daemon=True).start()
return {"started": True, "kind": kind}
def _run(kind, db_path):
cmd = _JOBS[kind] + ["--db", db_path]
env = dict(os.environ)
env["CRM_DB_PATH"] = db_path
try:
p = subprocess.run(cmd, cwd=INGEST_DIR, env=env, capture_output=True,
text=True, timeout=3600)
tail = (p.stdout + p.stderr).strip()[-1500:]
result = "ok" if p.returncode == 0 else f"error (exit {p.returncode})"
except Exception as exc: # noqa: BLE001
tail, result = str(exc), "error"
with _lock:
_state.update(running=False, finished_at=_now(), result=result, tail=tail)