cd3cca725c
- Dual sign-off is now the default (thesis_required_approvals defaults to 2).
- Entity-merge review queue (migration 0003): the fuzzy/Qwen tier no longer
auto-merges — it writes CANDIDATES (entity_merge_candidates) with a same/different
suggestion + confidence + reason for a human to approve (merge) or reject (keep
separate). entity_merge.py applies/rejects (durable via entity_merges, soft-delete,
repoint links+edges); decided pairs aren't re-surfaced.
- entity_jobs.py: UI-triggered background index jobs (rebuild/update/find-duplicates)
as subprocesses with a one-at-a-time lock; status in /api/system/status.
- server.py: /api/index/{rebuild,update}, /api/entities/find-duplicates,
/api/entities/merge-candidates [+ /{id} decide] — admin-gated.
- docs/thesis-seed-v2.md: concrete, plain-English rewrite per Grant's feedback.
Backend verified end-to-end on synthetic data (candidate gen -> approve/reject).
Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
66 lines
2.3 KiB
Python
66 lines
2.3 KiB
Python
"""Background index / entity-resolution jobs, triggered from the CRM web UI.
|
|
|
|
Replaces the StartOS actions with one-click web-UI buttons. Runs the ingest
|
|
scripts as SUBPROCESSES (cwd backend/ingest) so the heavy deps (fastembed, the
|
|
Spark clients) are not imported into the CRM server process, and so each run is
|
|
isolated. One job at a time (a process-level lock); progress/result is exposed
|
|
via get_status() and surfaced in /api/system/status.
|
|
|
|
Jobs:
|
|
rebuild_index — full re-chunk + re-embed into Qdrant (sync.py --recreate)
|
|
update_index — incremental sync (sync.py)
|
|
find_duplicates — deterministic resolution + Qwen suggestions -> review queue
|
|
"""
|
|
import os
|
|
import subprocess
|
|
import threading
|
|
from datetime import datetime, timezone
|
|
|
|
INGEST_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), "ingest")
|
|
|
|
_JOBS = {
|
|
"rebuild_index": ["python3", "sync.py", "--recreate"],
|
|
"update_index": ["python3", "sync.py"],
|
|
"find_duplicates": ["python3", "fuzzy_resolve.py"],
|
|
}
|
|
|
|
_lock = threading.Lock()
|
|
_state = {"running": False, "kind": None, "started_at": None,
|
|
"finished_at": None, "result": None, "tail": None}
|
|
|
|
|
|
def _now():
|
|
return datetime.now(timezone.utc).replace(tzinfo=None).isoformat() + "Z"
|
|
|
|
|
|
def get_status():
|
|
with _lock:
|
|
return dict(_state)
|
|
|
|
|
|
def start(kind, db_path):
|
|
if kind not in _JOBS:
|
|
return {"error": "unknown_job", "allowed": list(_JOBS)}
|
|
with _lock:
|
|
if _state["running"]:
|
|
return {"error": "job_running", "kind": _state["kind"]}
|
|
_state.update(running=True, kind=kind, started_at=_now(),
|
|
finished_at=None, result=None, tail=None)
|
|
threading.Thread(target=_run, args=(kind, db_path), daemon=True).start()
|
|
return {"started": True, "kind": kind}
|
|
|
|
|
|
def _run(kind, db_path):
|
|
cmd = _JOBS[kind] + ["--db", db_path]
|
|
env = dict(os.environ)
|
|
env["CRM_DB_PATH"] = db_path
|
|
try:
|
|
p = subprocess.run(cmd, cwd=INGEST_DIR, env=env, capture_output=True,
|
|
text=True, timeout=3600)
|
|
tail = (p.stdout + p.stderr).strip()[-1500:]
|
|
result = "ok" if p.returncode == 0 else f"error (exit {p.returncode})"
|
|
except Exception as exc: # noqa: BLE001
|
|
tail, result = str(exc), "error"
|
|
with _lock:
|
|
_state.update(running=False, finished_at=_now(), result=result, tail=tail)
|